Dubbo thread pool adaptation logic optimization

pull/235/head
chen.ma 2 years ago
parent dd4e2336bf
commit 6b6066fba1

@ -53,61 +53,55 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
@Override @Override
public ThreadPoolAdapterState getThreadPoolState(String identify) { public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
final ThreadPoolExecutor tp = DUBBO_PROTOCOL_EXECUTOR.get(identify); final ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(identify);
if (tp == null) { if (executor == null) {
return threadPoolAdapterState; return threadPoolAdapterState;
} }
threadPoolAdapterState.setThreadPoolKey(identify); threadPoolAdapterState.setThreadPoolKey(identify);
threadPoolAdapterState.setActive(tp.getActiveCount() + ""); threadPoolAdapterState.setActive(executor.getActiveCount() + "");
threadPoolAdapterState.setCoreSize(tp.getCorePoolSize()); threadPoolAdapterState.setCoreSize(executor.getCorePoolSize());
threadPoolAdapterState.setMaximumSize(tp.getMaximumPoolSize()); threadPoolAdapterState.setMaximumSize(executor.getMaximumPoolSize());
return threadPoolAdapterState; return threadPoolAdapterState;
} }
@Override @Override
public List<ThreadPoolAdapterState> getThreadPoolStates() { public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>(); List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
DUBBO_PROTOCOL_EXECUTOR.forEach((k, v) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(k)))); DUBBO_PROTOCOL_EXECUTOR.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(val))));
return threadPoolAdapterStates; return threadPoolAdapterStates;
} }
@Override @Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
final ThreadPoolExecutor tp = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey()); final ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey());
if (tp == null) { if (executor == null) {
return false; return false;
} }
tp.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
tp.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
return true; return true;
} }
@Override @Override
public void onApplicationEvent(ApplicationStartedEvent event) { public void onApplicationEvent(ApplicationStartedEvent event) {
boolean is2x = false; boolean is2xVersion = false;
String poolKey = "java.util.concurrent.ExecutorService"; String poolKey = ExecutorService.class.getName();
if (Version.getIntVersion(Version.getVersion()) < 3000000) { if (Version.getIntVersion(Version.getVersion()) < 3000000) {
is2x = true; is2xVersion = true;
} }
try {
if (is2x) { if (is2xVersion) {
try {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(poolKey); Map<String, Object> executors = dataStore.get(poolKey);
executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value)); executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value));
} catch (Exception e) { return;
log.error("Failed to get Dubbo 2.X protocol thread pool", e);
}
} else {
try {
final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
final ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = (ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data");
final ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey);
executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value));
} catch (Exception e) {
log.error("Failed to get Dubbo 3.X protocol thread pool", e);
} }
ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = (ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data");
ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey);
executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value));
} catch (Exception ex) {
log.error("Failed to get Dubbo {}.X protocol thread pool", is2xVersion ? "2" : "3", ex);
} }
} }
} }

Loading…
Cancel
Save