diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java index 9e8a4169..d880bc1b 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -8,6 +8,10 @@ import cn.hippo4j.starter.core.ThreadPoolConfigService; import cn.hippo4j.starter.core.ThreadPoolOperation; import cn.hippo4j.starter.enable.MarkerConfiguration; import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler; +import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler; +import cn.hippo4j.starter.monitor.HttpMvcSender; +import cn.hippo4j.starter.monitor.MessageSender; +import cn.hippo4j.starter.monitor.ReportingEventExecutor; import cn.hippo4j.starter.remote.HttpAgent; import cn.hippo4j.starter.toolkit.IdentifyUtil; import cn.hippo4j.starter.toolkit.inet.InetUtils; @@ -72,8 +76,24 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public PoolRunStateController poolRunStateController() { - return new PoolRunStateController(); + public ThreadPoolRunStateHandler threadPoolRunStateHandler() { + return new ThreadPoolRunStateHandler(); + } + + @Bean + public PoolRunStateController poolRunStateController(ThreadPoolRunStateHandler threadPoolRunStateHandler) { + return new PoolRunStateController(threadPoolRunStateHandler); + } + + @Bean + @SuppressWarnings("all") + public HttpMvcSender httpMvcSender(HttpAgent httpAgent) { + return new HttpMvcSender(httpAgent); + } + + @Bean + public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender) { + return new ReportingEventExecutor(properties, messageSender); } } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/Collect.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/Collect.java new file mode 100644 index 00000000..c4ef4bda --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/Collect.java @@ -0,0 +1,20 @@ +package cn.hippo4j.starter.monitor; + +import cn.hippo4j.common.monitor.Message; + +/** + * Collect dynamic thread pool data. + * + * @author chen.ma + * @date 2021/12/7 20:11 + */ +public interface Collect { + + /** + * Collect message. + * + * @return + */ + Message collectMessage(); + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/HttpMvcSender.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/HttpMvcSender.java new file mode 100644 index 00000000..0396d4db --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/HttpMvcSender.java @@ -0,0 +1,31 @@ +package cn.hippo4j.starter.monitor; + +import cn.hippo4j.common.monitor.Message; +import cn.hippo4j.starter.remote.HttpAgent; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import static cn.hippo4j.common.constant.Constants.MONITOR_PATH; + +/** + * Http mvc sender. + * + * @author chen.ma + * @date 2021/12/7 20:53 + */ +@Slf4j +@AllArgsConstructor +public class HttpMvcSender implements MessageSender { + + private final HttpAgent httpAgent; + + @Override + public void send(Message message) { + try { + httpAgent.httpPost(MONITOR_PATH, message); + } catch (Throwable ex) { + log.error("Failed to push dynamic thread pool runtime data.", ex); + } + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/MessageSender.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/MessageSender.java new file mode 100644 index 00000000..b3e02382 --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/MessageSender.java @@ -0,0 +1,20 @@ +package cn.hippo4j.starter.monitor; + +import cn.hippo4j.common.monitor.Message; + +/** + * Message sender. + * + * @author chen.ma + * @date 2021/12/7 20:49 + */ +public interface MessageSender { + + /** + * Send. + * + * @param message + */ + void send(Message message); + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java new file mode 100644 index 00000000..498e7fd6 --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java @@ -0,0 +1,122 @@ +package cn.hippo4j.starter.monitor; + +import cn.hippo4j.common.model.PoolRunStateInfo; +import cn.hippo4j.common.monitor.AbstractMessage; +import cn.hippo4j.common.monitor.Message; +import cn.hippo4j.common.monitor.MessageTypeEnum; +import cn.hippo4j.common.monitor.RuntimeMessage; +import cn.hippo4j.starter.config.BootstrapProperties; +import cn.hippo4j.starter.core.GlobalThreadPoolManage; +import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime; +import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; +import cn.hippo4j.starter.toolkit.thread.ThreadUtil; +import cn.hutool.core.bean.BeanUtil; +import com.google.common.collect.Lists; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.boot.CommandLineRunner; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static cn.hippo4j.starter.core.GlobalThreadPoolManage.getThreadPoolNum; +import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify; + +/** + * 动态线程池采集上报事件执行器. + *

+ * {@link BlockingQueue} 充当缓冲容器, 实现生产-消费模型. + * + * @author chen.ma + * @date 2021/12/6 20:23 + */ +@Slf4j +@RequiredArgsConstructor +public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements Runnable, Collect, CommandLineRunner, DisposableBean { + + @NonNull + private final BootstrapProperties properties; + + @NonNull + private final MessageSender messageSender; + + /** + * 数据采集的缓冲容器, 等待 ReportingEventExecutor 上报服务端 + */ + private final BlockingQueue messageCollectVessel = new ArrayBlockingQueue(4096); + + /** + * 数据采集定时执行器, Spring 启动后延时一段时间进行采集动态线程池的运行数据 + */ + private final ScheduledThreadPoolExecutor collectVesselExecutor = new ScheduledThreadPoolExecutor( + new Integer(1), + ThreadFactoryBuilder.builder().daemon(true).prefix("scheduled-collect-vessel").build() + ); + + @SneakyThrows + @Override + public void run() { + while (true) { + try { + Message message = messageCollectVessel.take(); + messageSender.send(message); + } catch (Throwable ex) { + log.error("Consumption buffer container task failed. Number of buffer container tasks :: {}", messageCollectVessel.size(), ex); + } + } + } + + @Override + public void run(String... args) { + // 延迟 10秒后每 5秒调用一次. scheduleAtFixedRate 间隔周期是前后两次任务的开始时间计算的, 不考虑执行任务本身的耗时 + collectVesselExecutor.scheduleAtFixedRate(() -> runTimeGatherTask(), 10, 5, TimeUnit.SECONDS); + ThreadUtil.newThread(this, "reporting-task", Boolean.TRUE).start(); + + log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum()); + } + + @Override + public void destroy() { + Optional.ofNullable(collectVesselExecutor).ifPresent((each) -> each.shutdown()); + } + + /** + * 采集动态线程池数据, 并添加缓冲队列 + */ + private void runTimeGatherTask() { + Message message = collectMessage(); + messageCollectVessel.offer(message); + } + + @Override + public Message collectMessage() { + AbstractMessage message = new RuntimeMessage(); + + List runtimeMessages = Lists.newArrayList(); + List listThreadPoolId = GlobalThreadPoolManage.listThreadPoolId(); + for (String each : listThreadPoolId) { + PoolRunStateInfo poolRunState = getPoolRunState(each); + RuntimeMessage runtimeMessage = BeanUtil.toBean(poolRunState, RuntimeMessage.class); + runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties)); + runtimeMessages.add(runtimeMessage); + } + + message.setMessageTypeEnum(MessageTypeEnum.RUNTIME); + message.setMessages(runtimeMessages); + + return message; + } + + @Override + protected PoolRunStateInfo supplement(PoolRunStateInfo poolRunStateInfo) { + return poolRunStateInfo; + } + +}