diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java index 2b5fc3b4..b5588613 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java @@ -48,7 +48,7 @@ import java.util.concurrent.*; public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner { @NonNull - private final Hippo4jSendMessageService hippoSendMessageService; + private final Hippo4jSendMessageService hippo4jSendMessageService; @Value("${spring.profiles.active:UNKNOWN}") private String active; @@ -101,19 +101,19 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner * @param threadPoolExecutor */ public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { - if (hippoSendMessageService == null) { + ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId); + if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getCapacityAlarm() <= 0) { return; } - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); BlockingQueue blockingQueue = threadPoolExecutor.getQueue(); int queueSize = blockingQueue.size(); int capacity = queueSize + blockingQueue.remainingCapacity(); int divide = CalculateUtil.divide(queueSize, capacity); - boolean isSend = threadPoolNotifyAlarm.getAlarm() && divide > threadPoolNotifyAlarm.getCapacityAlarm(); + boolean isSend = alarmConfig.getAlarm() && divide > alarmConfig.getCapacityAlarm(); if (isSend) { - AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); + AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor); alarmNotifyRequest.setThreadPoolId(threadPoolId); - hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.CAPACITY, alarmNotifyRequest); + hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.CAPACITY, alarmNotifyRequest); } } @@ -124,15 +124,18 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner * @param threadPoolExecutor */ public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId); + if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getCapacityAlarm() <= 0) { + return; + } int activeCount = threadPoolExecutor.getActiveCount(); int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int divide = CalculateUtil.divide(activeCount, maximumPoolSize); - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); - boolean isSend = threadPoolNotifyAlarm.getAlarm() && divide > threadPoolNotifyAlarm.getActiveAlarm(); + boolean isSend = alarmConfig.getAlarm() && divide > alarmConfig.getActiveAlarm(); if (isSend) { - AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); + AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor); alarmNotifyRequest.setThreadPoolId(threadPoolId); - hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.ACTIVITY, alarmNotifyRequest); + hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.ACTIVITY, alarmNotifyRequest); } } @@ -143,15 +146,15 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner */ public void asyncSendRejectedAlarm(String threadPoolId) { Runnable checkPoolRejectedAlarmTask = () -> { - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); - if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getAlarm()) { + ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId); + if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm()) { return; } ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) { - AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); + AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor); alarmNotifyRequest.setThreadPoolId(threadPoolId); - hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest); + hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest); } }; ASYNC_ALARM_NOTIFY_EXECUTOR.execute(checkPoolRejectedAlarmTask); @@ -166,13 +169,13 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner * @param threadPoolExecutor */ public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); - if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getAlarm()) { + ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId); + if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm()) { return; } if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) { try { - AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); + AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor); alarmNotifyRequest.setThreadPoolId(threadPoolId); alarmNotifyRequest.setExecuteTime(executeTime); alarmNotifyRequest.setExecuteTimeOut(executeTimeOut); @@ -180,7 +183,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner if (StringUtil.isNotBlank(executeTimeoutTrace)) { alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace); } - Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest); + Runnable task = () -> hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest); ASYNC_ALARM_NOTIFY_EXECUTOR.execute(task); } catch (Throwable ex) { log.error("Send thread pool execution timeout alarm error.", ex); @@ -198,50 +201,40 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner String appName = StrUtil.isBlank(itemId) ? applicationName : itemId; request.setAppName(appName); request.setIdentify(IdentifyUtil.getIdentify()); - hippoSendMessageService.sendChangeMessage(request); + hippo4jSendMessageService.sendChangeMessage(request); } /** - * Build alarm notify req. + * Build alarm notify request. * * @param threadPoolExecutor * @return */ - public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecutor) { - AlarmNotifyRequest request = new AlarmNotifyRequest(); - String appName = StrUtil.isBlank(itemId) ? applicationName : itemId; - request.setAppName(appName); - int corePoolSize = threadPoolExecutor.getCorePoolSize(); - int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); - int poolSize = threadPoolExecutor.getPoolSize(); - int activeCount = threadPoolExecutor.getActiveCount(); - int largestPoolSize = threadPoolExecutor.getLargestPoolSize(); - long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); - request.setActive(active.toUpperCase()); - request.setIdentify(IdentifyUtil.getIdentify()); - request.setCorePoolSize(corePoolSize); - request.setMaximumPoolSize(maximumPoolSize); - request.setPoolSize(poolSize); - request.setActiveCount(activeCount); - request.setLargestPoolSize(largestPoolSize); - request.setCompletedTaskCount(completedTaskCount); - BlockingQueue queue = threadPoolExecutor.getQueue(); - int queueSize = queue.size(); - String queueType = queue.getClass().getSimpleName(); - int remainingCapacity = queue.remainingCapacity(); - int queueCapacity = queueSize + remainingCapacity; - request.setQueueName(queueType); - request.setCapacity(queueCapacity); - request.setQueueSize(queueSize); - request.setRemainingCapacity(remainingCapacity); + public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) { + BlockingQueue blockingQueue = threadPoolExecutor.getQueue(); RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler() : threadPoolExecutor.getRejectedExecutionHandler(); - request.setRejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName()); long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() : -1L; - request.setRejectCountNum(rejectCount); - return request; + AlarmNotifyRequest alarmNotifyRequest = AlarmNotifyRequest.builder() + .appName(StrUtil.isBlank(itemId) ? applicationName : itemId) + .active(active.toUpperCase()) + .identify(IdentifyUtil.getIdentify()) + .corePoolSize(threadPoolExecutor.getCorePoolSize()) + .maximumPoolSize(threadPoolExecutor.getMaximumPoolSize()) + .poolSize(threadPoolExecutor.getPoolSize()) + .activeCount(threadPoolExecutor.getActiveCount()) + .largestPoolSize(threadPoolExecutor.getLargestPoolSize()) + .completedTaskCount(threadPoolExecutor.getCompletedTaskCount()) + .queueName(blockingQueue.getClass().getSimpleName()) + .capacity(blockingQueue.size() + blockingQueue.remainingCapacity()) + .queueSize(blockingQueue.size()) + .remainingCapacity(blockingQueue.remainingCapacity()) + .rejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName()) + .rejectCountNum(rejectCount) + .build(); + return alarmNotifyRequest; } } diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/request/AlarmNotifyRequest.java b/hippo4j-message/src/main/java/cn/hippo4j/message/request/AlarmNotifyRequest.java index 3b57289f..44a25599 100644 --- a/hippo4j-message/src/main/java/cn/hippo4j/message/request/AlarmNotifyRequest.java +++ b/hippo4j-message/src/main/java/cn/hippo4j/message/request/AlarmNotifyRequest.java @@ -19,13 +19,19 @@ package cn.hippo4j.message.request; import cn.hippo4j.message.enums.NotifyTypeEnum; import cn.hippo4j.message.request.base.BaseNotifyRequest; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; /** * Alarm notify request. */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor @Accessors(chain = true) public class AlarmNotifyRequest extends BaseNotifyRequest { diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index 8818ab3e..b7f4d7c7 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -80,8 +80,8 @@ public class DynamicThreadPoolCoreAutoConfiguration { } @Bean - public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) { - return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService); + public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippo4jSendMessageService) { + return new ThreadPoolNotifyAlarmHandler(hippo4jSendMessageService); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index c4caf35c..36676d50 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -201,7 +201,7 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) { - return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService); + public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippo4jSendMessageService) { + return new ThreadPoolNotifyAlarmHandler(hippo4jSendMessageService); } }