diff --git a/hippo4j-adapter/hippo4j-adapter-dubbo/pom.xml b/hippo4j-adapter/hippo4j-adapter-dubbo/pom.xml index d064872e..f5893f33 100644 --- a/hippo4j-adapter/hippo4j-adapter-dubbo/pom.xml +++ b/hippo4j-adapter/hippo4j-adapter-dubbo/pom.xml @@ -14,6 +14,12 @@ cn.hippo4j hippo4j-adapter-base + + org.apache.dubbo + dubbo + 3.0.2.1 + true + diff --git a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java index a1863d27..95c4b94f 100644 --- a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java @@ -20,16 +20,31 @@ package cn.hippo4j.adapter.dubbo; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hutool.core.util.ReflectUtil; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.apache.dubbo.common.Version; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.store.DataStore; +import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + /** * Dubbo thread-pool adapter. */ @Slf4j public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { + private final Map DUBBO_PROTOCOL_EXECUTOR = Maps.newHashMap(); + @Override public String mark() { return "Dubbo"; @@ -37,16 +52,62 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { - return null; + ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); + final ThreadPoolExecutor tp = DUBBO_PROTOCOL_EXECUTOR.get(identify); + if (tp == null) { + return threadPoolAdapterState; + } + threadPoolAdapterState.setThreadPoolKey(identify); + threadPoolAdapterState.setActive(tp.getActiveCount() + ""); + threadPoolAdapterState.setCoreSize(tp.getCorePoolSize()); + threadPoolAdapterState.setMaximumSize(tp.getMaximumPoolSize()); + return threadPoolAdapterState; + } + + @Override + public List getThreadPoolStates() { + List threadPoolAdapterStates = new ArrayList<>(); + DUBBO_PROTOCOL_EXECUTOR.forEach((k, v) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(k)))); + return threadPoolAdapterStates; } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { - return false; + final ThreadPoolExecutor tp = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey()); + if (tp == null) { + return false; + } + tp.setCorePoolSize(threadPoolAdapterParameter.getCoreSize()); + tp.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumSize()); + return true; } @Override public void onApplicationEvent(ApplicationStartedEvent event) { + boolean is2x = false; + String poolKey = "java.util.concurrent.ExecutorService"; + if (Version.getIntVersion(Version.getVersion()) < 3000000) { + is2x = true; + } + + if (is2x) { + try { + DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); + Map executors = dataStore.get(poolKey); + executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value)); + } catch (Exception e) { + log.error("Failed to get Dubbo 2.X protocol thread pool", e); + } + } else { + try { + final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); + final ConcurrentMap> data = (ConcurrentMap>) ReflectUtil.getFieldValue(executorRepository, "data"); + final ConcurrentMap executorServiceMap = data.get(poolKey); + executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value)); + } catch (Exception e) { + log.error("Failed to get Dubbo 3.X protocol thread pool", e); + } + } } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java index 23ea3f4b..a232089a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java @@ -56,7 +56,7 @@ public class ThreadPoolAdapterController { ThreadPoolAdapterState threadPoolState = each.getThreadPoolState(requestParameter.getThreadPoolKey()); String active = environment.getProperty("spring.profiles.active", "UNKNOWN"); threadPoolState.setActive(active.toUpperCase()); - String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils); + String clientAddress = CloudCommonIdUtil.getClientIpPort(environment, hippo4JInetUtils); threadPoolState.setClientAddress(clientAddress); threadPoolState.setIdentify(IdentifyUtil.getIdentify()); return threadPoolState;