|
|
|
@ -20,17 +20,21 @@ package cn.hippo4j.adapter.dubbo;
|
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
|
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
|
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
|
|
|
|
|
import cn.hutool.core.util.ReflectUtil;
|
|
|
|
|
import com.google.common.collect.Maps;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.apache.dubbo.common.Version;
|
|
|
|
|
import org.apache.dubbo.common.extension.ExtensionLoader;
|
|
|
|
|
import org.apache.dubbo.common.store.DataStore;
|
|
|
|
|
import org.apache.dubbo.remoting.Constants;
|
|
|
|
|
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
|
|
|
|
|
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
|
|
|
|
import org.springframework.context.ApplicationListener;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -63,7 +67,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
|
|
|
|
|
@Override
|
|
|
|
|
public List<ThreadPoolAdapterState> getThreadPoolStates() {
|
|
|
|
|
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
|
|
|
|
|
DUBBO_PROTOCOL_EXECUTOR.forEach((k, v) -> threadPoolAdapterStates.add(getThreadPoolState(k)));
|
|
|
|
|
DUBBO_PROTOCOL_EXECUTOR.forEach((k, v) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(k))));
|
|
|
|
|
return threadPoolAdapterStates;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -80,12 +84,30 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onApplicationEvent(ApplicationStartedEvent event) {
|
|
|
|
|
try {
|
|
|
|
|
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
|
|
|
|
|
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
|
|
|
|
|
executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value));
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("Failed to get Dubbo protocol thread pool", e);
|
|
|
|
|
boolean is2x = false;
|
|
|
|
|
String poolKey = "java.util.concurrent.ExecutorService";
|
|
|
|
|
if (Version.getIntVersion(Version.getVersion()) < 3000000) {
|
|
|
|
|
is2x = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (is2x) {
|
|
|
|
|
try {
|
|
|
|
|
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
|
|
|
|
|
Map<String, Object> executors = dataStore.get(poolKey);
|
|
|
|
|
executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value));
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("Failed to get Dubbo 2.X protocol thread pool", e);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
try {
|
|
|
|
|
final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
|
|
|
|
|
final ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = (ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data");
|
|
|
|
|
final ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey);
|
|
|
|
|
executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value));
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("Failed to get Dubbo 3.X protocol thread pool", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|