diff --git a/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java b/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java index cf817ba5..58cb942d 100644 --- a/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java +++ b/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java @@ -25,6 +25,8 @@ public class InstanceInfo { private String instanceId; + private String ipApplicationName; + private volatile String vipAddress; private volatile String secureVipAddress; diff --git a/dynamic-threadpool-spring-boot-starter/pom.xml b/dynamic-threadpool-spring-boot-starter/pom.xml index 7cf890e9..ea4a7483 100644 --- a/dynamic-threadpool-spring-boot-starter/pom.xml +++ b/dynamic-threadpool-spring-boot-starter/pom.xml @@ -60,6 +60,11 @@ org.projectlombok lombok + + + com.aliyun + alibaba-dingtalk-service-sdk + diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/AlarmConfig.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/AlarmConfig.java new file mode 100644 index 00000000..c87c6d7e --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/AlarmConfig.java @@ -0,0 +1,29 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import lombok.Data; + +/** + * Alarm Config. + * + * @author chen.ma + * @date 2021/8/15 16:09 + */ +@Data +public class AlarmConfig { + + /** + * type + */ + private String type; + + /** + * url + */ + private String url; + + /** + * token + */ + private String token; + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java new file mode 100644 index 00000000..c9bce77a --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/BaseSendMessageService.java @@ -0,0 +1,45 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.InitializingBean; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Base Send Message Service. + * + * @author chen.ma + * @date 2021/8/15 15:34 + */ +@RequiredArgsConstructor +public class BaseSendMessageService implements InitializingBean, SendMessageService { + + @NonNull + private final List alarmConfigs; + + private final List sendMessageHandlers = new ArrayList(4); + + @Override + public void sendMessage(CustomThreadPoolExecutor threadPoolExecutor) { + for (SendMessageHandler messageHandler : sendMessageHandlers) { + try { + messageHandler.sendMessage(alarmConfigs, threadPoolExecutor); + } catch (Exception ex) { + // ignore + } + } + } + + @Override + public void afterPropertiesSet() { + Map sendMessageHandlerMap = + ApplicationContextHolder.getBeansOfType(SendMessageHandler.class); + sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.add(each)); + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java new file mode 100644 index 00000000..8efae7c0 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/DingSendMessageHandler.java @@ -0,0 +1,133 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.DateUtil; +import com.dingtalk.api.DefaultDingTalkClient; +import com.dingtalk.api.DingTalkClient; +import com.dingtalk.api.request.OapiRobotSendRequest; +import com.github.dynamic.threadpool.common.model.InstanceInfo; +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; +import com.taobao.api.ApiException; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; + +/** + * Ding Send. + * + * @author chen.ma + * @date 2021/8/15 15:49 + */ +@Slf4j +@RequiredArgsConstructor +public class DingSendMessageHandler implements SendMessageHandler { + + @NonNull + private String active; + + @NonNull + private InstanceInfo instanceInfo; + + @Override + public String getType() { + return SendMessageEnum.DING.name(); + } + + @Override + public void sendMessage(List alarmConfigs, CustomThreadPoolExecutor pool) { + Optional alarmConfigOptional = alarmConfigs.stream() + .filter(each -> Objects.equals(each.getType(), getType())) + .findFirst(); + alarmConfigOptional.ifPresent(each -> dingSendMessage(each, pool)); + } + + public void dingSendMessage(AlarmConfig alarmConfig, CustomThreadPoolExecutor pool) { + String serverUrl = alarmConfig.getUrl() + alarmConfig.getToken(); + + BlockingQueue queue = pool.getQueue(); + String text = String.format( + "[警报] %s - 动态线程池运行告警 \n\n" + + " --- \n\n " + + "应用实例:%s \n\n " + + "线程池名称:%s \n\n " + + " --- \n\n " + + "核心线程数:%d \n\n " + + "最大线程数:%d \n\n " + + "当前线程数:%d \n\n " + + "活跃线程数:%d \n\n " + + "最大线程数:%d \n\n " + + "线程池任务总量:%d \n\n " + + " --- \n\n " + + "队列类型:%s \n\n " + + "队列总容量:%d \n\n " + + "队列元素个数:%d \n\n " + + "队列剩余个数:%d \n\n " + + " --- \n\n " + + "拒绝策略次数:%d \n\n " + + "OWNER:@%s \n\n" + + "提示:5 分钟内此线程池不会重复告警(可配置) \n\n" + + " --- \n\n " + + "**播报时间:%s**", + + // 环境 + active.toUpperCase(), + // 节点信息 + instanceInfo.getIpApplicationName(), + // 线程池ID + pool.getThreadPoolId(), + // 核心线程数 + pool.getCorePoolSize(), + // 最大线程数 + pool.getMaximumPoolSize(), + // 当前线程数 + pool.getPoolSize(), + // 活跃线程数 + pool.getActiveCount(), + // 最大任务数 + pool.getLargestPoolSize(), + // 线程池任务总量 + pool.getCompletedTaskCount(), + // 队列类型名称 + queue.getClass().getSimpleName(), + // 队列容量 + queue.size() + queue.remainingCapacity(), + // 队列元素个数 + queue.size(), + // 队列剩余个数 + queue.remainingCapacity(), + // 拒绝策略次数 + pool.getRejectCount(), + // 告警手机号 + "15601166691", + // 当前时间 + DateUtil.now() + + ); + + DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl); + OapiRobotSendRequest request = new OapiRobotSendRequest(); + request.setMsgtype("markdown"); + + OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown(); + markdown.setTitle("动态线程池告警"); + markdown.setText(text); + + OapiRobotSendRequest.At at = new OapiRobotSendRequest.At(); + at.setAtMobiles(CollUtil.newArrayList("15601166691")); + + request.setAt(at); + request.setMarkdown(markdown); + + try { + dingTalkClient.execute(request); + } catch (ApiException ex) { + log.error("Ding failed to send message", ex.getMessage()); + } + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageEnum.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageEnum.java new file mode 100644 index 00000000..f1990148 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageEnum.java @@ -0,0 +1,13 @@ +package com.github.dynamic.threadpool.starter.alarm; + +/** + * Send Message Enum. + * + * @author chen.ma + * @date 2021/8/15 15:50 + */ +public enum SendMessageEnum { + + DING + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java new file mode 100644 index 00000000..5a11d30c --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageHandler.java @@ -0,0 +1,30 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; + +import java.util.List; + +/** + * Send Message Handler. + * + * @author chen.ma + * @date 2021/8/15 15:44 + */ +public interface SendMessageHandler { + + /** + * getType. + * + * @return + */ + String getType(); + + /** + * sendMessage. + * + * @param alarmConfigs + * @param threadPoolExecutor + */ + void sendMessage(List alarmConfigs, CustomThreadPoolExecutor threadPoolExecutor); + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java new file mode 100644 index 00000000..e83d4786 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/SendMessageService.java @@ -0,0 +1,20 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; + +/** + * Send Msg. + * + * @author chen.ma + * @date 2021/8/15 15:31 + */ +public interface SendMessageService { + + /** + * sendMessage. + * + * @param threadPoolExecutor + */ + void sendMessage(CustomThreadPoolExecutor threadPoolExecutor); + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarm.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarm.java new file mode 100644 index 00000000..a20cb4bd --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarm.java @@ -0,0 +1,31 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * ThreadPool Alarm. + * + * @author chen.ma + * @date 2021/8/15 13:13 + */ +@Data +@AllArgsConstructor +public class ThreadPoolAlarm { + + /** + * 是否报警 + */ + private Boolean isAlarm; + + /** + * 活跃度报警 + */ + private Integer livenessAlarm; + + /** + * 容量报警 + */ + private Integer capacityAlarm; + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java new file mode 100644 index 00000000..e83ae7be --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/alarm/ThreadPoolAlarmManage.java @@ -0,0 +1,69 @@ +package com.github.dynamic.threadpool.starter.alarm; + +import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; +import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil; +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; +import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue; +import lombok.extern.slf4j.Slf4j; + +/** + * Alarm Manage. + * + * @author chen.ma + * @date 2021/8/15 14:13 + */ +@Slf4j +public class ThreadPoolAlarmManage { + + private static final SendMessageService SEND_MESSAGE_SERVICE; + + static { + SEND_MESSAGE_SERVICE = ApplicationContextHolder.getBean("sendMessageService", SendMessageService.class); + } + + /** + * checkPoolCapacityAlarm. + * + * @param threadPoolExecutor + */ + public static void checkPoolCapacityAlarm(CustomThreadPoolExecutor threadPoolExecutor) { + ThreadPoolAlarm threadPoolAlarm = threadPoolExecutor.getThreadPoolAlarm(); + ResizableCapacityLinkedBlockIngQueue blockIngQueue = + (ResizableCapacityLinkedBlockIngQueue) threadPoolExecutor.getQueue(); + + int queueSize = blockIngQueue.size(); + int capacity = queueSize + blockIngQueue.remainingCapacity(); + int divide = CalculateUtil.divide(queueSize, capacity); + if (divide > threadPoolAlarm.getCapacityAlarm()) { + SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); + } + } + + /** + * checkPoolLivenessAlarm. + * + * @param isCore + * @param threadPoolExecutor + */ + public static void checkPoolLivenessAlarm(boolean isCore, CustomThreadPoolExecutor threadPoolExecutor) { + if (isCore) { + return; + } + int activeCount = threadPoolExecutor.getActiveCount(); + int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); + int divide = CalculateUtil.divide(activeCount, maximumPoolSize); + if (divide > threadPoolExecutor.getThreadPoolAlarm().getLivenessAlarm()) { + SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); + } + } + + /** + * checkPoolRejectAlarm. + * + * @param threadPoolExecutor + */ + public static void checkPoolRejectAlarm(CustomThreadPoolExecutor threadPoolExecutor) { + SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/common/CommonThreadPool.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/common/CommonThreadPool.java index 14f589ee..4d01a2b2 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/common/CommonThreadPool.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/common/CommonThreadPool.java @@ -1,10 +1,10 @@ package com.github.dynamic.threadpool.starter.common; +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; -import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedPolicies; +import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -15,13 +15,15 @@ import java.util.concurrent.TimeUnit; */ public class CommonThreadPool { - public static ThreadPoolExecutor getInstance(String threadPoolId) { - ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.builder() + public static CustomThreadPoolExecutor getInstance(String threadPoolId) { + CustomThreadPoolExecutor poolExecutor = (CustomThreadPoolExecutor) ThreadPoolBuilder.builder() .isCustomPool(true) + .threadPoolId(threadPoolId) .threadFactory(threadPoolId) .poolThreadSize(3, 5) .keepAliveTime(1000L, TimeUnit.SECONDS) .rejected(RejectedPolicies.runsOldestTaskPolicy()) + .alarmConfig(1, 80, 80) .workQueue(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE, 512) .build(); return poolExecutor; diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/BootstrapProperties.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/BootstrapProperties.java index 7c5cc248..4b418a92 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/BootstrapProperties.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/BootstrapProperties.java @@ -1,10 +1,13 @@ package com.github.dynamic.threadpool.starter.config; +import com.github.dynamic.threadpool.starter.alarm.AlarmConfig; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.List; + /** * Bootstrap Properties. * @@ -39,4 +42,9 @@ public class BootstrapProperties { */ private boolean banner = true; + /** + * alarms + */ + private List alarms; + } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java index 3f83f4ca..572df75f 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java @@ -11,6 +11,7 @@ import org.springframework.core.env.ConfigurableEnvironment; import java.net.InetAddress; import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId; +import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getIpApplicationName; /** * Dynamic Tp Discovery Config. @@ -28,6 +29,7 @@ public class DiscoveryConfig { public InstanceInfo instanceConfig() { InstanceInfo instanceInfo = new InstanceInfo(); instanceInfo.setInstanceId(getDefaultInstanceId(environment)); + instanceInfo.setIpApplicationName(getIpApplicationName(environment)); instanceInfo.setAppName(environment.getProperty("spring.application.name")); instanceInfo.setHostName(InetAddress.getLocalHost().getHostAddress()); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java index 9736de9b..6868168b 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -15,6 +15,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; /** * DynamicTp Auto Configuration. @@ -24,9 +26,9 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @AllArgsConstructor -@EnableConfigurationProperties(BootstrapProperties.class) @ConditionalOnBean(MarkerConfiguration.Marker.class) -@ImportAutoConfiguration({HttpClientConfig.class, DiscoveryConfig.class}) +@EnableConfigurationProperties(BootstrapProperties.class) +@ImportAutoConfiguration({HttpClientConfig.class, DiscoveryConfig.class, MessageAlarmConfig.class}) public class DynamicThreadPoolAutoConfiguration { private final BootstrapProperties properties; @@ -37,6 +39,7 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean + @Order(Ordered.HIGHEST_PRECEDENCE) public ApplicationContextHolder applicationContextHolder() { return new ApplicationContextHolder(); } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/MessageAlarmConfig.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/MessageAlarmConfig.java new file mode 100644 index 00000000..3d19b79a --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/MessageAlarmConfig.java @@ -0,0 +1,41 @@ +package com.github.dynamic.threadpool.starter.config; + +import com.github.dynamic.threadpool.common.model.InstanceInfo; +import com.github.dynamic.threadpool.starter.alarm.BaseSendMessageService; +import com.github.dynamic.threadpool.starter.alarm.DingSendMessageHandler; +import com.github.dynamic.threadpool.starter.alarm.SendMessageHandler; +import com.github.dynamic.threadpool.starter.alarm.SendMessageService; +import lombok.AllArgsConstructor; +import org.apache.logging.log4j.util.Strings; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.DependsOn; +import org.springframework.core.env.ConfigurableEnvironment; + +/** + * Message Alarm Config. + * + * @author chen.ma + * @date 2021/8/15 15:39 + */ +@AllArgsConstructor +public class MessageAlarmConfig { + + private final BootstrapProperties properties; + + private final InstanceInfo instanceInfo; + + private ConfigurableEnvironment environment; + + @Bean + @DependsOn("applicationContextHolder") + public SendMessageService sendMessageService() { + return new BaseSendMessageService(properties.getAlarms()); + } + + @Bean + public SendMessageHandler dingSendMessageHandler() { + String active = environment.getProperty("spring.profiles.active", Strings.EMPTY); + return new DingSendMessageHandler(active, instanceInfo); + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java index 3462c47a..e02310b4 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java @@ -7,6 +7,7 @@ import com.github.dynamic.threadpool.common.web.base.Result; import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.config.BootstrapProperties; import com.github.dynamic.threadpool.starter.remote.HttpAgent; +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; @@ -81,7 +82,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { queryStrMap.put("namespace", properties.getNamespace()); PoolParameterInfo ppi = new PoolParameterInfo(); - ThreadPoolExecutor poolExecutor = null; + CustomThreadPoolExecutor poolExecutor = null; Result result = null; try { @@ -89,13 +90,15 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { // 使用相关参数创建线程池 BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - poolExecutor = ThreadPoolBuilder.builder() + poolExecutor = (CustomThreadPoolExecutor) ThreadPoolBuilder.builder() .isCustomPool(true) - .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) - .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) .workQueue(workQueue) + .threadPoolId(tpId) .threadFactory(tpId) + .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) + .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) .rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) + .alarmConfig(ppi.getIsAlarm(), ppi.getCapacityAlarm(), ppi.getLivenessAlarm()) .build(); dynamicThreadPoolWrap.setPool(poolExecutor); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java index 8119f712..fb037de4 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java @@ -17,12 +17,17 @@ public class CloudCommonIdUtil { @SneakyThrows public static String getDefaultInstanceId(PropertyResolver resolver) { + String namePart = getIpApplicationName(resolver); + String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port")); + return combineParts(namePart, SEPARATOR, indexPart); + } + + @SneakyThrows + public static String getIpApplicationName(PropertyResolver resolver) { InetAddress host = InetAddress.getLocalHost(); String hostname = host.getHostAddress(); String appName = resolver.getProperty("spring.application.name"); - String namePart = combineParts(hostname, SEPARATOR, appName); - String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port")); - return combineParts(namePart, SEPARATOR, indexPart); + return combineParts(hostname, SEPARATOR, appName); } public static String combineParts(String firstPart, String separator, diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java index 8aa0339d..1a43740f 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java @@ -1,6 +1,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread; import com.github.dynamic.threadpool.common.toolkit.Assert; +import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -100,9 +101,10 @@ public class AbstractBuildThreadPoolTemplate { initParam.getKeepAliveTime(), initParam.getTimeUnit(), initParam.getWorkQueue(), + initParam.getThreadPoolId(), initParam.getThreadFactory(), + initParam.getThreadPoolAlarm(), initParam.getRejectedExecutionHandler()); - return executorService; } @@ -150,7 +152,18 @@ public class AbstractBuildThreadPoolTemplate { */ private ThreadFactory threadFactory; + /** + * 线程 ID + */ + private String threadPoolId; + + /** + * 报警策略 + */ + private ThreadPoolAlarm threadPoolAlarm; + public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { + this.threadPoolId = threadNamePrefix; this.threadFactory = ThreadFactoryBuilder.builder() .prefix(threadNamePrefix) .daemon(isDaemon) diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/CustomThreadPoolExecutor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/CustomThreadPoolExecutor.java index 1d477a0e..9d970411 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/CustomThreadPoolExecutor.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/CustomThreadPoolExecutor.java @@ -1,5 +1,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread; +import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; +import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage; import lombok.NoArgsConstructor; import lombok.NonNull; @@ -46,47 +48,24 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; + private String threadPoolId; private final AccessControlContext acc; + private volatile ThreadPoolAlarm threadPoolAlarm; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; - private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy(); private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread"); - public CustomThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), DEFAULT_HANDLER); - } - - public CustomThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_HANDLER); - } - - public CustomThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); - } - public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, @NonNull BlockingQueue workQueue, + @NonNull String threadPoolId, @NonNull ThreadFactory threadFactory, + @NonNull ThreadPoolAlarm threadPoolAlarm, @NonNull RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); @@ -100,9 +79,11 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; + this.threadPoolId = threadPoolId; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; + this.threadPoolAlarm = threadPoolAlarm; this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); } @@ -147,6 +128,18 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { return rejectCount.get(); } + public ThreadPoolAlarm getThreadPoolAlarm() { + return this.threadPoolAlarm; + } + + public void setThreadPoolAlarm(ThreadPoolAlarm threadPoolAlarm) { + this.threadPoolAlarm = threadPoolAlarm; + } + + public String getThreadPoolId() { + return this.threadPoolId; + } + private final class Worker extends AbstractQueuedSynchronizer implements Runnable { @@ -316,6 +309,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { final void reject(Runnable command) { rejectCount.incrementAndGet(); + ThreadPoolAlarmManage.checkPoolRejectAlarm(this); handler.rejectedExecution(command, this); } @@ -368,6 +362,8 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { } } + ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this); + boolean workerStarted = false; boolean workerAdded = false; Worker w = null; @@ -542,6 +538,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { + ThreadPoolAlarmManage.checkPoolCapacityAlarm(this); int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) { reject(command); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java index 9290526b..09e38cb4 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java @@ -2,6 +2,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread; import com.github.dynamic.threadpool.common.toolkit.Assert; +import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; import java.math.BigDecimal; import java.util.concurrent.*; @@ -74,6 +75,26 @@ public class ThreadPoolBuilder implements Builder { */ private String threadNamePrefix; + /** + * 线程池 ID + */ + private String threadPoolId; + + /** + * 是否告警 + */ + private boolean isAlarm = false; + + /** + * 容量告警 + */ + private Integer capacityAlarm; + + /** + * 活跃度告警 + */ + private Integer livenessAlarm; + /** * 计算公式:CPU 核数 / (1 - 阻塞系数 0.8) * @@ -163,6 +184,18 @@ public class ThreadPoolBuilder implements Builder { return this; } + public ThreadPoolBuilder threadPoolId(String threadPoolId) { + this.threadPoolId = threadPoolId; + return this; + } + + public ThreadPoolBuilder alarmConfig(int isAlarm, int capacityAlarm, int livenessAlarm) { + this.isAlarm = isAlarm == 1 ? true : false; + this.capacityAlarm = capacityAlarm; + this.livenessAlarm = livenessAlarm; + return this; + } + /** * 构建 * @@ -222,7 +255,7 @@ public class ThreadPoolBuilder implements Builder { * @return */ private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { - Assert.notEmpty(builder.threadNamePrefix, "线程名称前缀不可为空或空的字符串."); + Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string."); AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); @@ -233,7 +266,12 @@ public class ThreadPoolBuilder implements Builder { .setRejectedExecutionHandler(builder.rejectedExecutionHandler) .setTimeUnit(builder.timeUnit); - // 快速消费线程池内置指定线程池 + if (builder.isCustomPool) { + initParam.setThreadPoolId(builder.threadPoolId); + ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(builder.isAlarm, builder.capacityAlarm, builder.livenessAlarm); + initParam.setThreadPoolAlarm(threadPoolAlarm); + } + if (!builder.isFastPool) { if (builder.queueType != null) { builder.workQueue = QueueTypeEnum.createBlockingQueue(builder.queueType.type, builder.capacity); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java index 2e32e2a7..2df3b945 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java @@ -1,11 +1,11 @@ package com.github.dynamic.threadpool.starter.wrap; import com.github.dynamic.threadpool.starter.common.CommonThreadPool; +import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import lombok.Data; import java.util.concurrent.Callable; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; /** * Dynamic ThreadPool Wrap. @@ -22,7 +22,7 @@ public class DynamicThreadPoolWrap { private String tpId; - private ThreadPoolExecutor pool; + private CustomThreadPoolExecutor pool; /** * 首选服务端线程池, 为空使用默认线程池 {@link CommonThreadPool#getInstance(String)} @@ -39,7 +39,7 @@ public class DynamicThreadPoolWrap { * @param threadPoolId * @param threadPoolExecutor */ - public DynamicThreadPoolWrap(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + public DynamicThreadPoolWrap(String threadPoolId, CustomThreadPoolExecutor threadPoolExecutor) { this.tpId = threadPoolId; this.pool = threadPoolExecutor; } diff --git a/pom.xml b/pom.xml index d89fd719..5c917bae 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,8 @@ 1.2.75 3.12.0 + 1.0.1 + 3.4.2 2.3.2.RELEASE @@ -119,6 +121,12 @@ guava ${guava.version} + + + com.aliyun + alibaba-dingtalk-service-sdk + ${dingtalk-sdk.version} +