From ca6ea429c2805ea3c4c2151187788ec228a953b7 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Mon, 31 Oct 2022 18:17:47 +0800 Subject: [PATCH] feat: Support get the min/max/avg execution time of task (#689) --- .../common/model/ThreadPoolRunStateInfo.java | 15 ++++ .../common/monitor/RuntimeMessage.java | 15 ++++ .../state/AbstractThreadPoolRuntime.java | 77 +++++++++++++------ 3 files changed, 83 insertions(+), 24 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolRunStateInfo.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolRunStateInfo.java index f1b04ae3..9d4774ae 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolRunStateInfo.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolRunStateInfo.java @@ -109,6 +109,21 @@ public class ThreadPoolRunStateInfo extends ThreadPoolBaseInfo implements Serial */ private Long timestamp; + /** + * minTaskTime + */ + private String minTaskTime; + + /** + * maxTaskTime + */ + private String maxTaskTime; + + /** + * avgTaskTime + */ + private String avgTaskTime; + public Integer getSimpleCurrentLoad() { return Integer.parseInt(getCurrentLoad().replace("%", "")); } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/monitor/RuntimeMessage.java b/hippo4j-common/src/main/java/cn/hippo4j/common/monitor/RuntimeMessage.java index 444aaa01..de82337b 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/monitor/RuntimeMessage.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/monitor/RuntimeMessage.java @@ -72,6 +72,21 @@ public class RuntimeMessage extends AbstractMessage { */ private Long completedTaskCount; + /** + * minTaskTime + */ + private String minTaskTime; + + /** + * maxTaskTime + */ + private String maxTaskTime; + + /** + * avgTaskTime + */ + private String avgTaskTime; + /** * rejectCount */ diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java index 6287b24a..a215e444 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java @@ -18,10 +18,14 @@ package cn.hippo4j.core.executor.state; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; -import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import cn.hippo4j.common.toolkit.CalculateUtil; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.common.toolkit.CalculateUtil; +import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @@ -63,24 +67,43 @@ public abstract class AbstractThreadPoolRuntime { */ public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor) { ThreadPoolRunStateInfo stateInfo = new ThreadPoolRunStateInfo(); - ThreadPoolExecutor pool = (ThreadPoolExecutor) executor; + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + stateInfo.setTpId(threadPoolId); + collectThreadPoolExecutorInfo(threadPoolExecutor, stateInfo); + collectPluginInfo(threadPoolExecutor, stateInfo); + stateInfo.setClientLastRefreshTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + stateInfo.setTimestamp(System.currentTimeMillis()); + return supplement(stateInfo); + } + + private void collectThreadPoolExecutorInfo(ThreadPoolExecutor executor, ThreadPoolRunStateInfo stateInfo) { // 核心线程数 - int corePoolSize = pool.getCorePoolSize(); + int corePoolSize = executor.getCorePoolSize(); // 最大线程数 - int maximumPoolSize = pool.getMaximumPoolSize(); + int maximumPoolSize = executor.getMaximumPoolSize(); // 线程池当前线程数 (有锁) - int poolSize = pool.getPoolSize(); + int poolSize = executor.getPoolSize(); // 活跃线程数 (有锁) - int activeCount = pool.getActiveCount(); + int activeCount = executor.getActiveCount(); // 同时进入池中的最大线程数 (有锁) - int largestPoolSize = pool.getLargestPoolSize(); + int largestPoolSize = executor.getLargestPoolSize(); // 线程池中执行任务总数量 (有锁) - long completedTaskCount = pool.getCompletedTaskCount(); + long completedTaskCount = executor.getCompletedTaskCount(); + stateInfo.setCoreSize(corePoolSize); + stateInfo.setPoolSize(poolSize); + stateInfo.setMaximumSize(maximumPoolSize); + stateInfo.setActiveSize(activeCount); + stateInfo.setLargestPoolSize(largestPoolSize); + stateInfo.setCompletedTaskCount(completedTaskCount); + // 当前负载 String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + ""; // 峰值负载 String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + ""; - BlockingQueue queue = pool.getQueue(); + stateInfo.setCurrentLoad(currentLoad); + stateInfo.setPeakLoad(peakLoad); + + BlockingQueue queue = executor.getQueue(); // 队列元素个数 int queueSize = queue.size(); // 队列类型 @@ -89,24 +112,30 @@ public abstract class AbstractThreadPoolRuntime { int remainingCapacity = queue.remainingCapacity(); // 队列容量 int queueCapacity = queueSize + remainingCapacity; - stateInfo.setCoreSize(corePoolSize); - stateInfo.setTpId(threadPoolId); - stateInfo.setPoolSize(poolSize); - stateInfo.setMaximumSize(maximumPoolSize); - stateInfo.setActiveSize(activeCount); - stateInfo.setCurrentLoad(currentLoad); - stateInfo.setPeakLoad(peakLoad); stateInfo.setQueueType(queueType); stateInfo.setQueueSize(queueSize); stateInfo.setQueueCapacity(queueCapacity); stateInfo.setQueueRemainingCapacity(remainingCapacity); - stateInfo.setLargestPoolSize(largestPoolSize); - stateInfo.setCompletedTaskCount(completedTaskCount); - long rejectCount = - pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) pool).getRejectCountNum() : -1L; + } + + private void collectPluginInfo(ThreadPoolExecutor executor, ThreadPoolRunStateInfo stateInfo) { + ThreadPoolPluginManager poolPluginManager = executor instanceof ThreadPoolPluginSupport ? (ThreadPoolPluginManager) executor : ThreadPoolPluginManager.empty(); + // TODO Make the runtime info of the plugin to internal object in stateInfo + // get reject count + Long rejectCount = poolPluginManager.getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class) + .map(TaskRejectCountRecordPlugin::getRejectCountNum) + .orElse(-1L); stateInfo.setRejectCount(rejectCount); - stateInfo.setClientLastRefreshTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); - stateInfo.setTimestamp(System.currentTimeMillis()); - return supplement(stateInfo); + // get time records + TaskTimeRecordPlugin.Summary summary = poolPluginManager.getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .map(TaskTimeoutNotifyAlarmPlugin::summarize) + .orElse(new TaskTimeRecordPlugin.Summary(-1L, -1L, -1L, 0)); + stateInfo.setMinTaskTime(summary.getMinTaskTimeMillis() + "ms"); + stateInfo.setMaxTaskTime(summary.getMaxTaskTimeMillis() + "ms"); + stateInfo.setAvgTaskTime(summary.getAvgTaskTimeMillis() + "ms"); + if (summary.getTaskCount() > 0) { + stateInfo.setCompletedTaskCount(summary.getTaskCount()); + } } + }