diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index 1fbac959..5fecd5c4 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -38,7 +38,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.annotation.Order; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -46,6 +45,7 @@ import java.util.Optional; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; @@ -164,48 +164,63 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs(); - if (CollectionUtil.isNotEmpty(notifyConfigs)) { - for (Map.Entry> each : newDynamicThreadPoolNotifyMap.entrySet()) { - if (checkNotifyConfig) { - break; - } - List notifyConfigDTOS = notifyConfigs.get(each.getKey()); - for (NotifyConfigDTO notifyConfig : each.getValue()) { - if (!notifyConfigDTOS.contains(notifyConfig)) { - checkNotifyConfig = true; - break; - } - } - } + + boolean checkNotifyConfig = checkAndReplaceNotifyConfig(newDynamicThreadPoolNotifyMap, notifyConfigs); + boolean checkNotifyAlarm = checkAndReplaceNotifyAlarm(executorProperties); + + if (checkNotifyConfig || checkNotifyAlarm) { + log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); } - if (checkNotifyConfig) { - configModeNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); - threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); + } + + private boolean checkAndReplaceNotifyConfig(Map> newConfigs, + Map> currentConfigs) { + if (CollectionUtil.isEmpty(currentConfigs)) { + return false; } - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); - if (threadPoolNotifyAlarm != null) { - Boolean isAlarm = executorProperties.getAlarm(); - Integer activeAlarm = executorProperties.getActiveAlarm(); - Integer capacityAlarm = executorProperties.getCapacityAlarm(); - if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) - || (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm())) - || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { - checkNotifyAlarm = true; - threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm())); - threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm())); - threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm())); + + for (Map.Entry> entry : newConfigs.entrySet()) { + String key = entry.getKey(); + List newNotifyConfigList = entry.getValue(); + List currentNotifyConfigList = currentConfigs.get(key); + + if (currentNotifyConfigList == null || !currentNotifyConfigList.containsAll(newNotifyConfigList)) { + configModeNotifyConfigBuilder.initCacheAndLock(newConfigs); + threadPoolBaseSendMessageService.putPlatform(newConfigs); + return true; } } - if (checkNotifyConfig || checkNotifyAlarm) { - log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); + + return false; + } + + private boolean checkAndReplaceNotifyAlarm(ExecutorProperties executorProperties) { + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); + if (threadPoolNotifyAlarm == null) { + return false; + } + + boolean checkNotifyAlarm = false; + Boolean isAlarm = executorProperties.getAlarm(); + Integer activeAlarm = executorProperties.getActiveAlarm(); + Integer capacityAlarm = executorProperties.getCapacityAlarm(); + + if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) + || (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm())) + || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { + checkNotifyAlarm = true; + threadPoolNotifyAlarm.setAlarm(isAlarm != null ? isAlarm : threadPoolNotifyAlarm.getAlarm()); + threadPoolNotifyAlarm.setActiveAlarm(activeAlarm != null ? activeAlarm : threadPoolNotifyAlarm.getActiveAlarm()); + threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm != null ? capacityAlarm : threadPoolNotifyAlarm.getCapacityAlarm()); } + + return checkNotifyAlarm; } + /** * Check consistency. * @@ -216,19 +231,26 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); - queue.setCapacity(properties.getQueueCapacity()); - } else { - log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); - } + } + + private void updateQueueCapacity(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) { + if (hasPropertyChanged(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { + ((ResizableCapacityLinkedBlockingQueue) executor.getQueue()).setCapacity(properties.getQueueCapacity()); + } else { + log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); } } + + private boolean hasPropertyChanged(T before, T after) { + return after != null && !Objects.equals(before, after); + } }