From ea6d0c9a95e880f460fceab8ae04dcdd107ede17 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Mon, 16 Aug 2021 20:30:22 +0800 Subject: [PATCH] Development configuration change message push. --- .../starter/alarm/BaseSendMessageService.java | 20 +++- .../starter/alarm/DingSendMessageHandler.java | 104 ++++++++++++++++-- .../starter/alarm/MessageTypeEnum.java | 15 +++ .../starter/alarm/SendMessageHandler.java | 15 ++- .../starter/alarm/SendMessageService.java | 12 +- .../starter/alarm/ThreadPoolAlarmManage.java | 22 +++- .../core/ThreadPoolDynamicRefresh.java | 51 ++++----- .../starter/toolkit/thread/QueueTypeEnum.java | 30 +++-- .../toolkit/thread/RejectedTypeEnum.java | 10 +- 9 files changed, 218 insertions(+), 61 deletions(-) create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/MessageTypeEnum.java diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java index c9bce77a..1630a04c 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java @@ -1,9 +1,11 @@ package com.github.dynamic.threadpool.starter.alarm; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; +import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import java.util.ArrayList; @@ -16,6 +18,7 @@ import java.util.Map; * @author chen.ma * @date 2021/8/15 15:34 */ +@Slf4j @RequiredArgsConstructor public class BaseSendMessageService implements InitializingBean, SendMessageService { @@ -25,12 +28,23 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ private final List sendMessageHandlers = new ArrayList(4); @Override - public void sendMessage(CustomThreadPoolExecutor threadPoolExecutor) { + public void sendAlarmMessage(CustomThreadPoolExecutor threadPoolExecutor) { for (SendMessageHandler messageHandler : sendMessageHandlers) { try { - messageHandler.sendMessage(alarmConfigs, threadPoolExecutor); + messageHandler.sendAlarmMessage(alarmConfigs, threadPoolExecutor); } catch (Exception ex) { - // ignore + log.warn("Failed to send thread pool alarm notification.", ex); + } + } + } + + @Override + public void sendChangeMessage(PoolParameterInfo parameter) { + for (SendMessageHandler messageHandler : sendMessageHandlers) { + try { + messageHandler.sendChangeMessage(alarmConfigs, parameter); + } catch (Exception ex) { + log.warn("Failed to send thread pool change notification.", ex); } } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java index 8efae7c0..4a552770 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java @@ -6,7 +6,12 @@ import com.dingtalk.api.DefaultDingTalkClient; import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiRobotSendRequest; import com.github.dynamic.threadpool.common.model.InstanceInfo; +import com.github.dynamic.threadpool.common.model.PoolParameterInfo; +import com.github.dynamic.threadpool.starter.core.GlobalThreadPoolManage; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; +import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; +import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; +import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import com.taobao.api.ApiException; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -16,9 +21,10 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; /** - * Ding Send. + * Send ding notification message. * * @author chen.ma * @date 2021/8/15 15:49 @@ -39,22 +45,28 @@ public class DingSendMessageHandler implements SendMessageHandler { } @Override - public void sendMessage(List alarmConfigs, CustomThreadPoolExecutor pool) { + public void sendAlarmMessage(List alarmConfigs, CustomThreadPoolExecutor pool) { Optional alarmConfigOptional = alarmConfigs.stream() .filter(each -> Objects.equals(each.getType(), getType())) .findFirst(); - alarmConfigOptional.ifPresent(each -> dingSendMessage(each, pool)); + alarmConfigOptional.ifPresent(each -> dingAlarmSendMessage(each, pool)); } - public void dingSendMessage(AlarmConfig alarmConfig, CustomThreadPoolExecutor pool) { - String serverUrl = alarmConfig.getUrl() + alarmConfig.getToken(); + @Override + public void sendChangeMessage(List alarmConfigs, PoolParameterInfo parameter) { + Optional changeConfigOptional = alarmConfigs.stream() + .filter(each -> Objects.equals(each.getType(), getType())) + .findFirst(); + changeConfigOptional.ifPresent(each -> dingChangeSendMessage(each, parameter)); + } + private void dingAlarmSendMessage(AlarmConfig alarmConfig, CustomThreadPoolExecutor pool) { BlockingQueue queue = pool.getQueue(); String text = String.format( "[警报] %s - 动态线程池运行告警 \n\n" + " --- \n\n " + + "线程池ID:%s \n\n " + "应用实例:%s \n\n " + - "线程池名称:%s \n\n " + " --- \n\n " + "核心线程数:%d \n\n " + "最大线程数:%d \n\n " + @@ -64,11 +76,12 @@ public class DingSendMessageHandler implements SendMessageHandler { "线程池任务总量:%d \n\n " + " --- \n\n " + "队列类型:%s \n\n " + - "队列总容量:%d \n\n " + + "队列容量:%d \n\n " + "队列元素个数:%d \n\n " + "队列剩余个数:%d \n\n " + " --- \n\n " + - "拒绝策略次数:%d \n\n " + + "拒绝策略:%s \n\n" + + "拒绝策略执行次数:%d \n\n " + "OWNER:@%s \n\n" + "提示:5 分钟内此线程池不会重复告警(可配置) \n\n" + " --- \n\n " + @@ -76,10 +89,10 @@ public class DingSendMessageHandler implements SendMessageHandler { // 环境 active.toUpperCase(), - // 节点信息 - instanceInfo.getIpApplicationName(), // 线程池ID pool.getThreadPoolId(), + // 节点信息 + instanceInfo.getIpApplicationName(), // 核心线程数 pool.getCorePoolSize(), // 最大线程数 @@ -100,6 +113,8 @@ public class DingSendMessageHandler implements SendMessageHandler { queue.size(), // 队列剩余个数 queue.remainingCapacity(), + // 拒绝策略名称 + pool.getRejectedExecutionHandler().getClass().getSimpleName(), // 拒绝策略次数 pool.getRejectCount(), // 告警手机号 @@ -109,16 +124,81 @@ public class DingSendMessageHandler implements SendMessageHandler { ); + execute(alarmConfig, "动态线程池告警", text, CollUtil.newArrayList("15601166691")); + } + + private void dingChangeSendMessage(AlarmConfig alarmConfig, PoolParameterInfo parameter) { + String threadPoolId = parameter.getTpId(); + DynamicThreadPoolWrap poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId); + if (poolWrap == null) { + log.warn("Thread pool is empty when sending change notification, threadPoolId :: {}", threadPoolId); + return; + } + + CustomThreadPoolExecutor customPool = poolWrap.getPool(); + /** + * hesitant e.g. ➲ ➜ ⇨ ➪ + */ + String text = String.format( + "[通知] %s - 动态线程池参数变更 \n\n" + + " --- \n\n " + + "线程池ID:%s \n\n " + + "应用实例:%s \n\n " + + " --- \n\n " + + "核心线程数:%s \n\n " + + "最大线程数:%s \n\n " + + "线程存活时间:%s / SECONDS \n\n" + + " --- \n\n " + + "队列类型:%s \n\n " + + "队列容量:%s \n\n " + + " --- \n\n " + + "AGO 拒绝策略:%s \n\n" + + "NOW 拒绝策略:%s \n\n" + + " --- \n\n " + + "提示:动态线程池配置变更实时通知(无限制) \n\n" + + "OWNER:@%s \n\n" + + " --- \n\n " + + "**播报时间:%s**", + // 环境 + active.toUpperCase(), + // 线程池名称 + threadPoolId, + // 节点信息 + instanceInfo.getIpApplicationName(), + // 核心线程数 + customPool.getCorePoolSize() + " ➲ " + parameter.getCoreSize(), + // 最大线程数 + customPool.getMaximumPoolSize() + " ➲ " + parameter.getMaxSize(), + // 线程存活时间 + customPool.getKeepAliveTime(TimeUnit.SECONDS) + " ➲ " + parameter.getKeepAliveTime(), + // 阻塞队列 + QueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()), + // 阻塞队列容量 + (customPool.getQueue().size() + customPool.getQueue().remainingCapacity()) + " ➲ " + parameter.getCapacity(), + // 拒绝策略 + customPool.getRejectedExecutionHandler().getClass().getSimpleName(), + RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType()), + // 告警手机号 + "15601166691", + // 当前时间 + DateUtil.now() + ); + + execute(alarmConfig, "动态线程池通知", text, CollUtil.newArrayList("15601166691")); + } + + private void execute(AlarmConfig alarmConfig, String title, String text, List mobiles) { + String serverUrl = alarmConfig.getUrl() + alarmConfig.getToken(); DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl); OapiRobotSendRequest request = new OapiRobotSendRequest(); request.setMsgtype("markdown"); OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown(); - markdown.setTitle("动态线程池告警"); + markdown.setTitle(title); markdown.setText(text); OapiRobotSendRequest.At at = new OapiRobotSendRequest.At(); - at.setAtMobiles(CollUtil.newArrayList("15601166691")); + at.setAtMobiles(mobiles); request.setAt(at); request.setMarkdown(markdown); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/MessageTypeEnum.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/MessageTypeEnum.java new file mode 100644 index 00000000..12d2f63e --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/MessageTypeEnum.java @@ -0,0 +1,15 @@ +package com.github.dynamic.threadpool.starter.alarm; + +/** + * Message Type Enum. + * + * @author chen.ma + * @date 2021/8/16 20:50 + */ +public enum MessageTypeEnum { + + CHANGE, + + ALARM + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java index 5a11d30c..8eebab69 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java @@ -1,5 +1,6 @@ package com.github.dynamic.threadpool.starter.alarm; +import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import java.util.List; @@ -13,18 +14,26 @@ import java.util.List; public interface SendMessageHandler { /** - * getType. + * Get type. * * @return */ String getType(); /** - * sendMessage. + * Send alarm message. * * @param alarmConfigs * @param threadPoolExecutor */ - void sendMessage(List alarmConfigs, CustomThreadPoolExecutor threadPoolExecutor); + void sendAlarmMessage(List alarmConfigs, CustomThreadPoolExecutor threadPoolExecutor); + + /** + * Send change message. + * + * @param alarmConfigs + * @param parameter + */ + void sendChangeMessage(List alarmConfigs, PoolParameterInfo parameter); } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java index e83d4786..995d13e2 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java @@ -1,5 +1,6 @@ package com.github.dynamic.threadpool.starter.alarm; +import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; /** @@ -11,10 +12,17 @@ import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExec public interface SendMessageService { /** - * sendMessage. + * Send alarm message. * * @param threadPoolExecutor */ - void sendMessage(CustomThreadPoolExecutor threadPoolExecutor); + void sendAlarmMessage(CustomThreadPoolExecutor threadPoolExecutor); + + /** + * Send change message. + * + * @param parameter + */ + void sendChangeMessage(PoolParameterInfo parameter); } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java index e83ae7be..6de11bf3 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java @@ -1,6 +1,7 @@ package com.github.dynamic.threadpool.starter.alarm; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; +import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue; @@ -22,7 +23,7 @@ public class ThreadPoolAlarmManage { } /** - * checkPoolCapacityAlarm. + * Check thread pool capacity alarm. * * @param threadPoolExecutor */ @@ -35,12 +36,12 @@ public class ThreadPoolAlarmManage { int capacity = queueSize + blockIngQueue.remainingCapacity(); int divide = CalculateUtil.divide(queueSize, capacity); if (divide > threadPoolAlarm.getCapacityAlarm()) { - SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); + SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor); } } /** - * checkPoolLivenessAlarm. + * Check thread pool activity alarm. * * @param isCore * @param threadPoolExecutor @@ -53,17 +54,26 @@ public class ThreadPoolAlarmManage { int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int divide = CalculateUtil.divide(activeCount, maximumPoolSize); if (divide > threadPoolExecutor.getThreadPoolAlarm().getLivenessAlarm()) { - SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); + SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor); } } /** - * checkPoolRejectAlarm. + * Check thread pool reject policy alarm. * * @param threadPoolExecutor */ public static void checkPoolRejectAlarm(CustomThreadPoolExecutor threadPoolExecutor) { - SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); + SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor); + } + + /** + * Send thread pool configuration change message. + * + * @param parameter + */ + public static void sendPoolConfigChange(PoolParameterInfo parameter) { + SEND_MESSAGE_SERVICE.sendChangeMessage(parameter); } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java index a9e74148..b5872f1c 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java @@ -1,10 +1,11 @@ package com.github.dynamic.threadpool.starter.core; import com.alibaba.fastjson.JSON; -import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; -import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue; import com.github.dynamic.threadpool.common.model.PoolParameterInfo; +import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage; +import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; +import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue; import lombok.extern.slf4j.Slf4j; import java.util.Objects; @@ -22,60 +23,60 @@ public class ThreadPoolDynamicRefresh { public static void refreshDynamicPool(String content) { PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class); - String tpId = parameter.getTpId(); - Integer coreSize = parameter.getCoreSize(), maxSize = parameter.getMaxSize(), - queueType = parameter.getQueueType(), capacity = parameter.getCapacity(), - keepAliveTime = parameter.getKeepAliveTime(), rejectedType = parameter.getRejectedType(); - refreshDynamicPool(tpId, coreSize, maxSize, queueType, capacity, keepAliveTime, rejectedType); + ThreadPoolAlarmManage.sendPoolConfigChange(parameter); + ThreadPoolDynamicRefresh.refreshDynamicPool(parameter); } - public static void refreshDynamicPool(String threadPoolId, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime, Integer rejectedType) { + public static void refreshDynamicPool(PoolParameterInfo parameter) { + String threadPoolId = parameter.getTpId(); ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); int originalCoreSize = executor.getCorePoolSize(); int originalMaximumPoolSize = executor.getMaximumPoolSize(); - int originalQueryType = queueType; + int originalQueryType = parameter.getQueueType(); int originalCapacity = executor.getQueue().remainingCapacity() + executor.getQueue().size(); long originalKeepAliveTime = executor.getKeepAliveTime(TimeUnit.MILLISECONDS); - int originalRejectedType = rejectedType; + int originalRejectedType = parameter.getRejectedType(); - changePoolInfo(executor, coreSize, maxSize, queueType, capacity, keepAliveTime, rejectedType); + changePoolInfo(executor, parameter); ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); log.info("[🔥 {}] Changed thread pool. coreSize :: [{}], maxSize :: [{}], queueType :: [{}], capacity :: [{}], keepAliveTime :: [{}], rejectedType :: [{}]", threadPoolId.toUpperCase(), String.format("%s=>%s", originalCoreSize, afterExecutor.getCorePoolSize()), String.format("%s=>%s", originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()), - String.format("%s=>%s", originalQueryType, queueType), - String.format("%s=>%s", originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())), + String.format("%s=>%s", originalQueryType, parameter.getQueueType()), + String.format("%s=>%s", originalCapacity, + (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())), String.format("%s=>%s", originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS)), - String.format("%s=>%s", originalRejectedType, rejectedType)); + String.format("%s=>%s", originalRejectedType, parameter.getRejectedType())); } - public static void changePoolInfo(ThreadPoolExecutor executor, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime, Integer rejectedType) { - if (coreSize != null) { - executor.setCorePoolSize(coreSize); + public static void changePoolInfo(ThreadPoolExecutor executor, PoolParameterInfo parameter) { + if (parameter.getCoreSize() != null) { + executor.setCorePoolSize(parameter.getCoreSize()); } - if (maxSize != null) { - executor.setMaximumPoolSize(maxSize); + if (parameter.getMaxSize() != null) { + executor.setMaximumPoolSize(parameter.getMaxSize()); } - if (capacity != null && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type, queueType)) { + if (parameter.getCapacity() != null + && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type, parameter.getQueueType())) { if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) { ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue(); - queue.setCapacity(capacity); + queue.setCapacity(parameter.getCapacity()); } else { log.warn("[Pool change] The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName()); } } - if (keepAliveTime != null) { - executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS); + if (parameter.getKeepAliveTime() != null) { + executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS); } - if (rejectedType != null) { - executor.setRejectedExecutionHandler(RejectedTypeEnum.createPolicy(queueType)); + if (parameter.getRejectedType() != null) { + executor.setRejectedExecutionHandler(RejectedTypeEnum.createPolicy(parameter.getRejectedType())); } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/QueueTypeEnum.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/QueueTypeEnum.java index 2f19b16b..067d3cb9 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/QueueTypeEnum.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/QueueTypeEnum.java @@ -3,6 +3,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread; import com.github.dynamic.threadpool.starter.spi.DynamicTpServiceLoader; import com.github.dynamic.threadpool.starter.spi.CustomBlockingQueue; +import java.util.Arrays; import java.util.Collection; import java.util.Objects; import java.util.Optional; @@ -19,49 +20,52 @@ public enum QueueTypeEnum { /** * {@link java.util.concurrent.ArrayBlockingQueue} */ - ARRAY_BLOCKING_QUEUE(1), + ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"), /** * {@link java.util.concurrent.LinkedBlockingQueue} */ - LINKED_BLOCKING_QUEUE(2), + LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"), /** * {@link java.util.concurrent.LinkedBlockingDeque} */ - LINKED_BLOCKING_DEQUE(3), + LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque"), /** * {@link java.util.concurrent.SynchronousQueue} */ - SYNCHRONOUS_QUEUE(4), + SYNCHRONOUS_QUEUE(4, "SynchronousQueue"), /** * {@link java.util.concurrent.LinkedTransferQueue} */ - LINKED_TRANSFER_QUEUE(5), + LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue"), /** * {@link java.util.concurrent.PriorityBlockingQueue} */ - PRIORITY_BLOCKING_QUEUE(6), + PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue"), /** * {@link "io.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue"} */ - RESIZABLE_LINKED_BLOCKING_QUEUE(9); + RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockIngQueue"); public Integer type; - QueueTypeEnum(int type) { + public String name; + + QueueTypeEnum(int type, String name) { this.type = type; + this.name = name; } static { DynamicTpServiceLoader.register(CustomBlockingQueue.class); } - public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) { + public static BlockingQueue createBlockingQueue(int type, Integer capacity) { BlockingQueue blockingQueue = null; if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) { blockingQueue = new ArrayBlockingQueue(capacity); @@ -90,4 +94,12 @@ public enum QueueTypeEnum { return blockingQueue; } + public static String getBlockingQueueNameByType(int type) { + Optional queueTypeEnum = Arrays.stream(QueueTypeEnum.values()) + .filter(each -> each.type == type) + .findFirst(); + + return queueTypeEnum.map(each -> each.name).orElse(""); + } + } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/RejectedTypeEnum.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/RejectedTypeEnum.java index 93fedfbb..2781c98c 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/RejectedTypeEnum.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/RejectedTypeEnum.java @@ -3,6 +3,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread; import com.github.dynamic.threadpool.starter.spi.CustomRejectedExecutionHandler; import com.github.dynamic.threadpool.starter.spi.DynamicTpServiceLoader; +import java.util.Arrays; import java.util.Collection; import java.util.Objects; import java.util.Optional; @@ -67,7 +68,7 @@ public enum RejectedTypeEnum { DynamicTpServiceLoader.register(CustomRejectedExecutionHandler.class); } - public static RejectedExecutionHandler createPolicy(Integer type) { + public static RejectedExecutionHandler createPolicy(int type) { Optional rejectedTypeEnum = Stream.of(RejectedTypeEnum.values()) .filter(each -> Objects.equals(type, each.type)) .map(each -> each.rejectedHandler) @@ -88,4 +89,11 @@ public enum RejectedTypeEnum { return resultRejected; } + public static String getRejectedNameByType(int type) { + Optional rejectedTypeEnum = Arrays.stream(RejectedTypeEnum.values()) + .filter(each -> each.type == type).findFirst(); + + return rejectedTypeEnum.map(each -> each.rejectedHandler.getClass().getSimpleName()).orElse(""); + } + }