From 66c4610f34260590991e5a0ded3d42b430c084ea Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 16 Dec 2021 22:47:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=8F=91=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E5=8E=86=E5=8F=B2=E6=95=B0=E6=8D=AE=E9=87=87?= =?UTF-8?q?=E9=9B=86=E4=B8=8A=E6=8A=A5.=20(#15)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hippo4j/config/config/CommonConfig.java | 16 +++++ .../biz/impl/HisRunDataServiceImpl.java | 2 + .../console/controller/MonitorController.java | 20 ++++-- .../starter/config/BootstrapProperties.java | 13 ++-- .../DynamicThreadPoolAutoConfiguration.java | 10 ++- .../monitor/ReportingEventExecutor.java | 65 +++++++------------ .../{Collect.java => collect/Collector.java} | 4 +- .../monitor/collect/RunTimeInfoCollector.java | 54 +++++++++++++++ .../monitor/{ => send}/HttpConnectSender.java | 2 +- .../monitor/{ => send}/MessageSender.java | 2 +- 10 files changed, 134 insertions(+), 54 deletions(-) rename hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/{Collect.java => collect/Collector.java} (76%) create mode 100644 hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/RunTimeInfoCollector.java rename hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/{ => send}/HttpConnectSender.java (95%) rename hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/{ => send}/MessageSender.java (85%) diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/config/CommonConfig.java b/hippo4j-config/src/main/java/cn/hippo4j/config/config/CommonConfig.java index 403256ed..6c7e3b54 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/config/CommonConfig.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/config/CommonConfig.java @@ -5,6 +5,10 @@ import cn.hippo4j.common.api.impl.JacksonHandler; import cn.hippo4j.common.config.ApplicationContextHolder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS; /** * Common config. @@ -25,4 +29,16 @@ public class CommonConfig { return new ApplicationContextHolder(); } + @Bean + @Primary + public ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor() { + ThreadPoolTaskExecutor monitorThreadPool = new ThreadPoolTaskExecutor(); + monitorThreadPool.setCorePoolSize(AVAILABLE_PROCESSORS); + monitorThreadPool.setMaxPoolSize(AVAILABLE_PROCESSORS << 1); + monitorThreadPool.setQueueCapacity(4096); + monitorThreadPool.setAllowCoreThreadTimeOut(true); + monitorThreadPool.setAwaitTerminationMillis(5000); + return monitorThreadPool; + } + } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/HisRunDataServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/HisRunDataServiceImpl.java index 27342d7b..6554522e 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/HisRunDataServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/HisRunDataServiceImpl.java @@ -44,6 +44,7 @@ public class HisRunDataServiceImpl extends ServiceImpl times = Lists.newArrayList(); diff --git a/hippo4j-console/src/main/java/cn/hippo4j/console/controller/MonitorController.java b/hippo4j-console/src/main/java/cn/hippo4j/console/controller/MonitorController.java index 39cbd865..0c309e32 100644 --- a/hippo4j-console/src/main/java/cn/hippo4j/console/controller/MonitorController.java +++ b/hippo4j-console/src/main/java/cn/hippo4j/console/controller/MonitorController.java @@ -13,6 +13,7 @@ import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose; import cn.hippo4j.config.service.biz.HisRunDataService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.bind.annotation.*; import java.util.List; @@ -33,6 +34,8 @@ public class MonitorController { private final QueryMonitorExecuteChoose queryMonitorExecuteChoose; + private final ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor; + @GetMapping public Result> queryMonitor(MonitorQueryReqDTO reqDTO) { List monitorRespList = hisRunDataService.query(reqDTO); @@ -40,15 +43,24 @@ public class MonitorController { } @PostMapping("/info") - public Result querInfoThreadPoolMonitor(@RequestBody MonitorQueryReqDTO reqDTO) { + public Result queryInfoThreadPoolMonitor(@RequestBody MonitorQueryReqDTO reqDTO) { MonitorActiveRespDTO monitorRespList = hisRunDataService.queryInfoThreadPoolMonitor(reqDTO); return Results.success(monitorRespList); } @PostMapping - public Result dataCollect(@RequestBody MessageWrapper messageWrapper) { - Message message = MessageConvert.convert(messageWrapper); - queryMonitorExecuteChoose.chooseAndExecute(message); + public Result dataCollect(@RequestBody MessageWrapper messageWrapper) { + Runnable task = () -> { + Message message = MessageConvert.convert(messageWrapper); + queryMonitorExecuteChoose.chooseAndExecute(message); + }; + + try { + monitorThreadPoolTaskExecutor.execute(task); + } catch (Exception ex) { + log.error("Monitoring data insertion database task overflow.", ex); + } + return Results.success(); } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/BootstrapProperties.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/BootstrapProperties.java index 280f7d00..9494e239 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/BootstrapProperties.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/BootstrapProperties.java @@ -20,23 +20,28 @@ public class BootstrapProperties { public static final String PREFIX = "spring.dynamic.thread-pool"; /** - * serverAddr + * Server addr */ private String serverAddr; /** - * namespace + * Namespace */ private String namespace; /** - * itemId + * Item id */ private String itemId; /** - * Enable banner + * Print dynamic thread pool banner */ private boolean banner = true; + /** + * Time interval for client to collect monitoring data. unit: ms + */ + private Long collectInterval = 5000L; + } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java index 02565f46..d9b084c1 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -11,9 +11,10 @@ import cn.hippo4j.starter.core.ThreadPoolOperation; import cn.hippo4j.starter.enable.MarkerConfiguration; import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler; -import cn.hippo4j.starter.monitor.HttpConnectSender; -import cn.hippo4j.starter.monitor.MessageSender; import cn.hippo4j.starter.monitor.ReportingEventExecutor; +import cn.hippo4j.starter.monitor.collect.RunTimeInfoCollector; +import cn.hippo4j.starter.monitor.send.HttpConnectSender; +import cn.hippo4j.starter.monitor.send.MessageSender; import cn.hippo4j.starter.remote.HttpAgent; import cn.hippo4j.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.starter.remote.ServerHealthCheck; @@ -112,6 +113,11 @@ public class DynamicThreadPoolAutoConfiguration { return new JacksonHandler(); } + @Bean + public RunTimeInfoCollector runTimeInfoCollector() { + return new RunTimeInfoCollector(properties); + } + } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java index 3c0aecbd..85ecf427 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java @@ -1,18 +1,14 @@ package cn.hippo4j.starter.monitor; -import cn.hippo4j.common.model.PoolRunStateInfo; -import cn.hippo4j.common.monitor.AbstractMessage; +import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.monitor.Message; -import cn.hippo4j.common.monitor.MessageTypeEnum; -import cn.hippo4j.common.monitor.RuntimeMessage; import cn.hippo4j.starter.config.BootstrapProperties; -import cn.hippo4j.starter.core.GlobalThreadPoolManage; -import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime; +import cn.hippo4j.starter.monitor.collect.Collector; +import cn.hippo4j.starter.monitor.send.MessageSender; import cn.hippo4j.starter.remote.ServerHealthCheck; import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; import cn.hippo4j.starter.toolkit.thread.ThreadUtil; -import cn.hutool.core.bean.BeanUtil; -import com.google.common.collect.Lists; +import cn.hutool.core.collection.CollUtil; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -20,7 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.CommandLineRunner; -import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -28,7 +24,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static cn.hippo4j.starter.core.GlobalThreadPoolManage.getThreadPoolNum; -import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify; /** * 动态线程池采集上报事件执行器. @@ -40,7 +35,7 @@ import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify; */ @Slf4j @RequiredArgsConstructor -public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements Runnable, Collect, CommandLineRunner, DisposableBean { +public class ReportingEventExecutor implements Runnable, CommandLineRunner, DisposableBean { @NonNull private final BootstrapProperties properties; @@ -51,6 +46,11 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements @NonNull private final ServerHealthCheck serverHealthCheck; + /** + * 数据采集集合 + */ + private Map collectors; + /** * 数据采集的缓冲容器, 等待 ReportingEventExecutor 上报服务端 */ @@ -79,10 +79,13 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements @Override public void run(String... args) { - // 延迟 10秒后每 5秒调用一次. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔 - collectVesselExecutor.scheduleWithFixedDelay(() -> runTimeGatherTask(), 10, 5, TimeUnit.SECONDS); - ThreadUtil.newThread(this, "reporting-task", Boolean.TRUE).start(); + long initialDelay = 100000; + String reportingTaskName = "reporting-task"; + // 延迟 initialDelay 后循环调用. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔 + collectVesselExecutor.scheduleWithFixedDelay(() -> runTimeGatherTask(), initialDelay, properties.getCollectInterval(), TimeUnit.MILLISECONDS); + ThreadUtil.newThread(this, reportingTaskName, Boolean.TRUE).start(); + collectors = ApplicationContextHolder.getBeansOfType(Collector.class); log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum()); } @@ -96,35 +99,17 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements */ private void runTimeGatherTask() { boolean healthStatus = serverHealthCheck.isHealthStatus(); - if (!healthStatus) { + if (!healthStatus || CollUtil.isEmpty(collectors)) { return; } - Message message = collectMessage(); - messageCollectVessel.offer(message); - } - - @Override - public Message collectMessage() { - AbstractMessage message = new RuntimeMessage(); - - List runtimeMessages = Lists.newArrayList(); - List listThreadPoolId = GlobalThreadPoolManage.listThreadPoolId(); - for (String each : listThreadPoolId) { - PoolRunStateInfo poolRunState = getPoolRunState(each); - RuntimeMessage runtimeMessage = BeanUtil.toBean(poolRunState, RuntimeMessage.class); - runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties)); - runtimeMessages.add(runtimeMessage); - } - - message.setMessageType(MessageTypeEnum.RUNTIME); - message.setMessages(runtimeMessages); - - return message; - } - @Override - protected PoolRunStateInfo supplement(PoolRunStateInfo poolRunStateInfo) { - return poolRunStateInfo; + collectors.forEach((beanName, collector) -> { + Message message = collector.collectMessage(); + boolean offer = messageCollectVessel.offer(message); + if (!offer) { + log.warn("Buffer data starts stacking data..."); + } + }); } } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/Collect.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/Collector.java similarity index 76% rename from hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/Collect.java rename to hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/Collector.java index c4ef4bda..eba657e0 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/Collect.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/Collector.java @@ -1,4 +1,4 @@ -package cn.hippo4j.starter.monitor; +package cn.hippo4j.starter.monitor.collect; import cn.hippo4j.common.monitor.Message; @@ -8,7 +8,7 @@ import cn.hippo4j.common.monitor.Message; * @author chen.ma * @date 2021/12/7 20:11 */ -public interface Collect { +public interface Collector { /** * Collect message. diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/RunTimeInfoCollector.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/RunTimeInfoCollector.java new file mode 100644 index 00000000..c7b8680d --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/collect/RunTimeInfoCollector.java @@ -0,0 +1,54 @@ +package cn.hippo4j.starter.monitor.collect; + +import cn.hippo4j.common.model.PoolRunStateInfo; +import cn.hippo4j.common.monitor.AbstractMessage; +import cn.hippo4j.common.monitor.Message; +import cn.hippo4j.common.monitor.MessageTypeEnum; +import cn.hippo4j.common.monitor.RuntimeMessage; +import cn.hippo4j.starter.config.BootstrapProperties; +import cn.hippo4j.starter.core.GlobalThreadPoolManage; +import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime; +import cn.hutool.core.bean.BeanUtil; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; + +import java.util.List; + +import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify; + +/** + * Thread pool runtime data collection. + * + * @author chen.ma + * @date 2021/12/16 19:46 + */ +@AllArgsConstructor +public class RunTimeInfoCollector extends AbstractThreadPoolRuntime implements Collector { + + private final BootstrapProperties properties; + + @Override + public Message collectMessage() { + AbstractMessage message = new RuntimeMessage(); + + List runtimeMessages = Lists.newArrayList(); + List listThreadPoolId = GlobalThreadPoolManage.listThreadPoolId(); + for (String each : listThreadPoolId) { + PoolRunStateInfo poolRunState = getPoolRunState(each); + RuntimeMessage runtimeMessage = BeanUtil.toBean(poolRunState, RuntimeMessage.class); + runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties)); + runtimeMessages.add(runtimeMessage); + } + + message.setMessageType(MessageTypeEnum.RUNTIME); + message.setMessages(runtimeMessages); + + return message; + } + + @Override + protected PoolRunStateInfo supplement(PoolRunStateInfo basePoolRunStateInfo) { + return basePoolRunStateInfo; + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/HttpConnectSender.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/send/HttpConnectSender.java similarity index 95% rename from hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/HttpConnectSender.java rename to hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/send/HttpConnectSender.java index 09139bcf..a2a5064e 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/HttpConnectSender.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/send/HttpConnectSender.java @@ -1,4 +1,4 @@ -package cn.hippo4j.starter.monitor; +package cn.hippo4j.starter.monitor.send; import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.MessageWrapper; diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/MessageSender.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/send/MessageSender.java similarity index 85% rename from hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/MessageSender.java rename to hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/send/MessageSender.java index b3e02382..03cce85d 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/MessageSender.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/send/MessageSender.java @@ -1,4 +1,4 @@ -package cn.hippo4j.starter.monitor; +package cn.hippo4j.starter.monitor.send; import cn.hippo4j.common.monitor.Message;