From 677459e6f44042ea573fa40780a6457bbba14682 Mon Sep 17 00:00:00 2001 From: Oliver Date: Sun, 10 Jul 2022 11:01:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Ddubbo=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../adapter/base/ThreadPoolAdapterState.java | 10 ++++++ .../adapter/dubbo/DubboThreadPoolAdapter.java | 5 +++ ...ynamicThreadPoolCoreAutoConfiguration.java | 6 ++++ .../AbstractDynamicThreadPoolMonitor.java | 27 +++++++++++++++- .../starter/monitor/LogMonitorHandler.java | 6 ++++ .../starter/monitor/MetricMonitorHandler.java | 31 +++++++++++++++++++ 6 files changed, 84 insertions(+), 1 deletion(-) diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java index b46c6c26..ca8b272a 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java @@ -64,4 +64,14 @@ public class ThreadPoolAdapterState { * Blocking queue capacity */ private Integer blockingQueueCapacity; + + private Integer poolSize; + + private Integer activeSize; + + private Long completedTaskCount; + + private Integer queueSize; + + private Integer remainingCapacity; } diff --git a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java index 609b3607..2bb7bfc9 100644 --- a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java @@ -63,6 +63,11 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis threadPoolAdapterState.setThreadPoolKey(identify); threadPoolAdapterState.setCoreSize(executor.getCorePoolSize()); threadPoolAdapterState.setMaximumSize(executor.getMaximumPoolSize()); + threadPoolAdapterState.setPoolSize(executor.getPoolSize()); + threadPoolAdapterState.setActiveSize(executor.getActiveCount()); + threadPoolAdapterState.setCompletedTaskCount(executor.getCompletedTaskCount()); + threadPoolAdapterState.setQueueSize(executor.getQueue().size()); + threadPoolAdapterState.setRemainingCapacity(executor.getQueue().remainingCapacity()); return threadPoolAdapterState; } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index c77dd5b9..525936bd 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -17,6 +17,7 @@ package cn.hippo4j.core.springboot.starter.config; +import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; import cn.hippo4j.common.api.NotifyConfigBuilder; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.notify.AlarmControlHandler; @@ -204,4 +205,9 @@ public class DynamicThreadPoolCoreAutoConfiguration { public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties); } + + @Bean + public ThreadPoolAdapterBeanContainer threadPoolAdapterBeanContainer() { + return new ThreadPoolAdapterBeanContainer(); + } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/AbstractDynamicThreadPoolMonitor.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/AbstractDynamicThreadPoolMonitor.java index c93c73fc..95013149 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/AbstractDynamicThreadPoolMonitor.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/AbstractDynamicThreadPoolMonitor.java @@ -17,13 +17,19 @@ package cn.hippo4j.core.springboot.starter.monitor; +import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; +import cn.hutool.core.collection.CollUtil; import lombok.RequiredArgsConstructor; +import java.util.Collection; import java.util.List; +import static cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer.THREAD_POOL_ADAPTER_BEAN_CONTAINER; + /** * Abstract dynamic thread-pool monitor. * @@ -42,12 +48,31 @@ public abstract class AbstractDynamicThreadPoolMonitor implements DynamicThreadP */ protected abstract void execute(ThreadPoolRunStateInfo poolRunStateInfo); + protected abstract void execute(ThreadPoolAdapterState poolAdapterState); + @Override public void collect() { List listDynamicThreadPoolId = GlobalThreadPoolManage.listThreadPoolId(); for (String each : listDynamicThreadPoolId) { ThreadPoolRunStateInfo poolRunState = threadPoolRunStateHandler.getPoolRunState(each); - execute(poolRunState); + // 查询是否是第三方框架线程池 + boolean flag = true; + final Collection values = THREAD_POOL_ADAPTER_BEAN_CONTAINER.values(); + if (CollUtil.isEmpty(values)) { + continue; + } + for (ThreadPoolAdapter item : values) { + final ThreadPoolAdapterState threadPoolAdapterState = item.getThreadPoolState(each); + if (threadPoolAdapterState.getCoreSize() == null || threadPoolAdapterState.getCoreSize() == 0) { + continue; + } + flag = false; + execute(threadPoolAdapterState); + break; + } + if (flag) { + execute(poolRunState); + } } } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/LogMonitorHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/LogMonitorHandler.java index f6d0aee7..7b3dd55e 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/LogMonitorHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/LogMonitorHandler.java @@ -17,6 +17,7 @@ package cn.hippo4j.core.springboot.starter.monitor; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; @@ -40,6 +41,11 @@ public class LogMonitorHandler extends AbstractDynamicThreadPoolMonitor { log.info("{}", JSONUtil.toJSONString(poolRunStateInfo)); } + @Override + protected void execute(ThreadPoolAdapterState poolAdapterState) { + log.info("{}", JSONUtil.toJSONString(poolAdapterState)); + } + @Override public String getType() { return "log"; diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/MetricMonitorHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/MetricMonitorHandler.java index 25b8cdbe..a7d3949c 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/MetricMonitorHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/monitor/MetricMonitorHandler.java @@ -17,6 +17,7 @@ package cn.hippo4j.core.springboot.starter.monitor; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; @@ -45,6 +46,8 @@ public class MetricMonitorHandler extends AbstractDynamicThreadPoolMonitor { private final Map RUN_STATE_CACHE = Maps.newConcurrentMap(); + private final Map ADAPTER_STATE_CACHE = Maps.newConcurrentMap(); + public MetricMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) { super(threadPoolRunStateHandler); } @@ -82,6 +85,34 @@ public class MetricMonitorHandler extends AbstractDynamicThreadPoolMonitor { Metrics.gauge(metricName("reject.count"), tags, poolRunStateInfo, ThreadPoolRunStateInfo::getRejectCount); } + @Override + protected void execute(ThreadPoolAdapterState poolAdapterState) { + ThreadPoolAdapterState stateInfo = ADAPTER_STATE_CACHE.get(poolAdapterState.getThreadPoolKey()); + if (stateInfo == null) { + ADAPTER_STATE_CACHE.put(poolAdapterState.getThreadPoolKey(), poolAdapterState); + } else { + BeanUtil.copyProperties(poolAdapterState, stateInfo); + } + + Environment environment = ApplicationContextHolder.getInstance().getEnvironment(); + String applicationName = environment.getProperty("spring.application.name", "application"); + Iterable tags = Lists.newArrayList( + Tag.of(DYNAMIC_THREAD_POOL_ID_TAG, poolAdapterState.getThreadPoolKey()), + Tag.of(APPLICATION_NAME_TAG, applicationName)); + + // thread pool + Metrics.gauge(metricName("core.size"), tags, poolAdapterState, ThreadPoolAdapterState::getCoreSize); + Metrics.gauge(metricName("maximum.size"), tags, poolAdapterState, ThreadPoolAdapterState::getMaximumSize); + Metrics.gauge(metricName("current.size"), tags, poolAdapterState, ThreadPoolAdapterState::getPoolSize); + Metrics.gauge(metricName("active.size"), tags, poolAdapterState, ThreadPoolAdapterState::getActiveSize); + // queue + Metrics.gauge(metricName("queue.capacity"), tags, poolAdapterState, ThreadPoolAdapterState::getBlockingQueueCapacity); + Metrics.gauge(metricName("queue.size"), tags, poolAdapterState, ThreadPoolAdapterState::getQueueSize); + Metrics.gauge(metricName("queue.remaining.capacity"), tags, poolAdapterState, ThreadPoolAdapterState::getRemainingCapacity); + // other + Metrics.gauge(metricName("completed.task.count"), tags, poolAdapterState, ThreadPoolAdapterState::getCompletedTaskCount); + } + private String metricName(String name) { return String.join(".", METRIC_NAME_PREFIX, name); }