From 7fbddb1f4690bc5840810fa753af752fcc166edb Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Fri, 29 Jul 2022 23:41:05 +0800 Subject: [PATCH] Notify related parameters to add dynamic change function (#407) --- .../hippo4j/message/dto/NotifyConfigDTO.java | 22 +++--- .../service/HippoBaseSendMessageService.java | 4 +- .../service/ThreadPoolNotifyAlarm.java | 17 +++-- ...ynamicThreadPoolCoreAutoConfiguration.java | 8 +- .../notify/CoreNotifyConfigBuilder.java | 14 ++-- .../refresher/event/ExecutorsListener.java | 73 ++++++++++++++++++- 6 files changed, 107 insertions(+), 31 deletions(-) diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/dto/NotifyConfigDTO.java b/hippo4j-message/src/main/java/cn/hippo4j/message/dto/NotifyConfigDTO.java index f3b1e13e..72bda5ca 100644 --- a/hippo4j-message/src/main/java/cn/hippo4j/message/dto/NotifyConfigDTO.java +++ b/hippo4j-message/src/main/java/cn/hippo4j/message/dto/NotifyConfigDTO.java @@ -19,62 +19,64 @@ package cn.hippo4j.message.dto; import cn.hippo4j.message.enums.NotifyTypeEnum; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; /** * Notify config dto. */ @Data +@EqualsAndHashCode @Accessors(chain = true) public class NotifyConfigDTO { /** - * 租户id + * Tenant id */ private String tenantId; /** - * 项目id + * Item id */ private String itemId; /** - * 线程池id + * Thread-pool id */ private String tpId; /** - * 通知平台 + * Platform */ private String platform; /** - * 通知类型 + * Type */ private String type; /** - * 密钥 + * Secret key */ private String secretKey; /** - * 加签 + * Secret */ private String secret; /** - * 报警间隔 + * Interval */ private Integer interval; /** - * 接收者 + * Receives */ private String receives; /** - * 报警类型 + * Type enum */ private NotifyTypeEnum typeEnum; } diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/service/HippoBaseSendMessageService.java b/hippo4j-message/src/main/java/cn/hippo4j/message/service/HippoBaseSendMessageService.java index 1dae394c..10fcab2a 100644 --- a/hippo4j-message/src/main/java/cn/hippo4j/message/service/HippoBaseSendMessageService.java +++ b/hippo4j-message/src/main/java/cn/hippo4j/message/service/HippoBaseSendMessageService.java @@ -27,6 +27,7 @@ import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.google.common.collect.Maps; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; @@ -45,7 +46,8 @@ public class HippoBaseSendMessageService implements HippoSendMessageService, Com private final AlarmControlHandler alarmControlHandler; - private final Map> notifyConfigs = Maps.newHashMap(); + @Getter + public final Map> notifyConfigs = Maps.newHashMap(); private final Map sendMessageHandlers = Maps.newHashMap(); diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java b/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java index b2f820c7..83032c87 100644 --- a/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java +++ b/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java @@ -33,36 +33,39 @@ import java.util.Map; public class ThreadPoolNotifyAlarm { /** - * isAlarm + * Is alarm */ @NonNull private Boolean isAlarm; /** - * activeAlarm + * Active alarm */ @NonNull private Integer activeAlarm; /** - * capacityAlarm + * Capacity alarm */ @NonNull private Integer capacityAlarm; /** - * interval + * Interval */ private Integer interval; /** - * receive + * Receive */ private String receive; /** - * receives - * ps:暂不启用该配置,后续如果开发邮箱时或许有用 + * Receives + * + *

+ * Do not enable this configuration for the time being, it may be useful if you develop mailboxes in the future. + *

*/ @Deprecated private Map receives; diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index fb089ee6..d2d28cb9 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -36,6 +36,7 @@ import cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister; import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.service.AlarmControlHandler; +import cn.hippo4j.message.service.HippoBaseSendMessageService; import cn.hippo4j.message.service.HippoSendMessageService; import lombok.AllArgsConstructor; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; @@ -107,8 +108,11 @@ public class DynamicThreadPoolCoreAutoConfiguration { } @Bean - public ExecutorsListener hippo4jExecutorsListener(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) { - return new ExecutorsListener(threadPoolNotifyAlarmHandler); + @SuppressWarnings("all") + public ExecutorsListener hippo4jExecutorsListener(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, + CoreNotifyConfigBuilder coreNotifyConfigBuilder, + HippoBaseSendMessageService hippoBaseSendMessageService) { + return new ExecutorsListener(threadPoolNotifyAlarmHandler, coreNotifyConfigBuilder, hippoBaseSendMessageService); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java index 6818afda..3c05ba02 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java @@ -64,7 +64,9 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { return resultMap; } for (ExecutorProperties executor : executors) { - resultMap.putAll(buildSingleNotifyConfig(executor)); + Map> buildSingleNotifyConfig = buildSingleNotifyConfig(executor); + initCacheAndLock(buildSingleNotifyConfig); + resultMap.putAll(buildSingleNotifyConfig); } return resultMap; } @@ -80,7 +82,6 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { String threadPoolId = executor.getThreadPoolId(); String alarmBuildKey = threadPoolId + "+ALARM"; List alarmNotifyConfigs = Lists.newArrayList(); - List notifyPlatforms = bootstrapCoreProperties.getNotifyPlatforms(); for (NotifyPlatformProperties platformProperties : notifyPlatforms) { NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); @@ -97,10 +98,8 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { alarmNotifyConfigs.add(notifyConfig); } resultMap.put(alarmBuildKey, alarmNotifyConfigs); - String changeBuildKey = threadPoolId + "+CONFIG"; List changeNotifyConfigs = Lists.newArrayList(); - for (NotifyPlatformProperties platformProperties : notifyPlatforms) { NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); notifyConfig.setPlatform(platformProperties.getPlatform()); @@ -112,13 +111,14 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { changeNotifyConfigs.add(notifyConfig); } resultMap.put(changeBuildKey, changeNotifyConfigs); + return resultMap; + } - resultMap.forEach( + public void initCacheAndLock(Map> buildSingleNotifyConfig) { + buildSingleNotifyConfig.forEach( (key, val) -> val.stream() .filter(each -> StrUtil.equals("ALARM", each.getType())) .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); - - return resultMap; } private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/ExecutorsListener.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/ExecutorsListener.java index 08fa9cf9..8a9a8e05 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/ExecutorsListener.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/ExecutorsListener.java @@ -17,9 +17,10 @@ package cn.hippo4j.core.springboot.starter.refresher.event; -import cn.hippo4j.message.request.ChangeParameterNotifyRequest; +import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.QueueTypeEnum; @@ -28,13 +29,20 @@ import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; +import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder; import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage; +import cn.hippo4j.message.dto.NotifyConfigDTO; +import cn.hippo4j.message.request.ChangeParameterNotifyRequest; +import cn.hippo4j.message.service.HippoBaseSendMessageService; +import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; +import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.core.annotation.Order; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; @@ -55,20 +63,27 @@ public class ExecutorsListener implements ApplicationListener executors = bindableCoreProperties.getExecutors(); for (ExecutorProperties properties : executors) { String threadPoolId = properties.getThreadPoolId(); + // Check whether the notification configuration is consistent. + // this operation will not trigger the notification. + checkNotifyConsistencyAndReplace(properties); if (!checkConsistency(threadPoolId, properties)) { continue; } - // refresh executor pool + // refresh executor pool. dynamicRefreshPool(threadPoolId, properties); - // old properties + // old properties. ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); - // refresh executor properties + // refresh executor properties. GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); log.info(CHANGE_THREAD_POOL_TEXT, threadPoolId.toUpperCase(), @@ -116,6 +131,56 @@ public class ExecutorsListener implements ApplicationListener changeKeys = Lists.newArrayList(); + Map> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(properties); + Map> notifyConfigs = hippoBaseSendMessageService.getNotifyConfigs(); + if (CollectionUtil.isNotEmpty(notifyConfigs)) { + for (Map.Entry> each : newDynamicThreadPoolNotifyMap.entrySet()) { + if (checkNotifyConfig) { + break; + } + List notifyConfigDTOS = notifyConfigs.get(each.getKey()); + for (NotifyConfigDTO notifyConfig : each.getValue()) { + if (!notifyConfigDTOS.contains(notifyConfig)) { + checkNotifyConfig = true; + changeKeys.add(each.getKey()); + break; + } + } + } + } + if (checkNotifyConfig) { + coreNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); + hippoBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); + } + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(properties.getThreadPoolId()); + if (threadPoolNotifyAlarm != null && properties.getNotify() != null) { + ThreadPoolNotifyAlarm notify = properties.getNotify(); + boolean isAlarm = notify.getIsAlarm(); + Integer activeAlarm = notify.getActiveAlarm(); + Integer capacityAlarm = notify.getCapacityAlarm(); + if (threadPoolNotifyAlarm.getIsAlarm() != isAlarm + || threadPoolNotifyAlarm.getActiveAlarm() != activeAlarm + || threadPoolNotifyAlarm.getCapacityAlarm() != capacityAlarm) { + checkNotifyAlarm = true; + threadPoolNotifyAlarm.setIsAlarm(isAlarm); + threadPoolNotifyAlarm.setActiveAlarm(activeAlarm); + threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm); + } + } + if (checkNotifyConfig || checkNotifyAlarm) { + log.info("[{}] Dynamic thread pool notification property changes.", properties.getThreadPoolId()); + } + } + /** * Check consistency. *