|
|
@ -46,7 +46,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI
|
|
|
|
@Slf4j
|
|
|
|
@Slf4j
|
|
|
|
public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
|
|
|
|
public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
|
|
|
|
|
|
|
|
|
|
|
|
private final Map<String, ThreadPoolExecutor> DUBBO_PROTOCOL_EXECUTOR = new HashMap<>();
|
|
|
|
private final Map<String, ThreadPoolExecutor> dubboProtocolExecutor = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public String mark() {
|
|
|
|
public String mark() {
|
|
|
@ -56,7 +56,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public ThreadPoolAdapterState getThreadPoolState(String identify) {
|
|
|
|
public ThreadPoolAdapterState getThreadPoolState(String identify) {
|
|
|
|
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
|
|
|
|
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
|
|
|
|
ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(identify);
|
|
|
|
ThreadPoolExecutor executor = dubboProtocolExecutor.get(identify);
|
|
|
|
if (executor == null) {
|
|
|
|
if (executor == null) {
|
|
|
|
log.warn("[{}] Dubbo consuming thread pool not found.", identify);
|
|
|
|
log.warn("[{}] Dubbo consuming thread pool not found.", identify);
|
|
|
|
return threadPoolAdapterState;
|
|
|
|
return threadPoolAdapterState;
|
|
|
@ -70,14 +70,14 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public List<ThreadPoolAdapterState> getThreadPoolStates() {
|
|
|
|
public List<ThreadPoolAdapterState> getThreadPoolStates() {
|
|
|
|
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
|
|
|
|
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
|
|
|
|
DUBBO_PROTOCOL_EXECUTOR.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key))));
|
|
|
|
dubboProtocolExecutor.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key))));
|
|
|
|
return threadPoolAdapterStates;
|
|
|
|
return threadPoolAdapterStates;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
|
|
|
|
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
|
|
|
|
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
|
|
|
|
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
|
|
|
|
ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey());
|
|
|
|
ThreadPoolExecutor executor = dubboProtocolExecutor.get(threadPoolAdapterParameter.getThreadPoolKey());
|
|
|
|
if (executor == null) {
|
|
|
|
if (executor == null) {
|
|
|
|
log.warn("[{}] Dubbo consuming thread pool not found.", threadPoolKey);
|
|
|
|
log.warn("[{}] Dubbo consuming thread pool not found.", threadPoolKey);
|
|
|
|
return false;
|
|
|
|
return false;
|
|
|
@ -105,14 +105,14 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
|
|
|
|
if (isLegacyVersion) {
|
|
|
|
if (isLegacyVersion) {
|
|
|
|
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
|
|
|
|
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
|
|
|
|
Map<String, Object> executors = dataStore.get(poolKey);
|
|
|
|
Map<String, Object> executors = dataStore.get(poolKey);
|
|
|
|
executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value));
|
|
|
|
executors.forEach((key, value) -> dubboProtocolExecutor.put(key, (ThreadPoolExecutor) value));
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
|
|
|
|
ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
|
|
|
|
ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data =
|
|
|
|
ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data =
|
|
|
|
(ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data");
|
|
|
|
(ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data");
|
|
|
|
ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey);
|
|
|
|
ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey);
|
|
|
|
executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value));
|
|
|
|
executorServiceMap.forEach((key, value) -> dubboProtocolExecutor.put(String.valueOf(key), (ThreadPoolExecutor) value));
|
|
|
|
} catch (Exception ex) {
|
|
|
|
} catch (Exception ex) {
|
|
|
|
log.error("Failed to get Dubbo {} protocol thread pool", Version.getVersion(), ex);
|
|
|
|
log.error("Failed to get Dubbo {} protocol thread pool", Version.getVersion(), ex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|