From 9c3915413bd6aabfc9e5fbedf175b86a49cec83b Mon Sep 17 00:00:00 2001 From: Song <1150590475@qq.com> Date: Wed, 19 Jun 2024 17:01:57 +0800 Subject: [PATCH 1/3] refactor: delete EtcdRefresherHandler redundant comments --- .../springboot/starter/refresher/EtcdRefresherHandler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java index 26064938..004c1f12 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java @@ -17,7 +17,6 @@ package cn.hippo4j.config.springboot.starter.refresher; -import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; import io.etcd.jetcd.ByteSequence; @@ -65,7 +64,6 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh String key = etcd.get(KEY); Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET)); initClient(etcd, charset); - // TODO Currently only supports json GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get(); KeyValue keyValue = getResponse.getKvs().get(0); if (Objects.isNull(keyValue)) { @@ -103,7 +101,6 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh * @param charset charset */ private void initClient(Map etcd, Charset charset) { - // TODO if (Objects.isNull(client)) { String user = etcd.get(USER); String password = etcd.get(PASSWORD); From 88788d6d12c9c36c1212ddf822f3c5fd08a12aed Mon Sep 17 00:00:00 2001 From: Song <1150590475@qq.com> Date: Wed, 19 Jun 2024 17:05:22 +0800 Subject: [PATCH 2/3] refactor: refactor JsonConfigParser import classes --- .../dynamic/mode/config/parser/JsonConfigParser.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java index 87fe6ba1..c0acf1d9 100644 --- a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java +++ b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java @@ -23,7 +23,12 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.util.*; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** * Json config parser. From ba7538de8ec3d47abd08102bc69b3c4d5b88121c Mon Sep 17 00:00:00 2001 From: Song <1150590475@qq.com> Date: Wed, 19 Jun 2024 20:11:02 +0800 Subject: [PATCH 3/3] refactor: refactor DynamicThreadPoolRefreshListener function code --- .../DynamicThreadPoolRefreshListener.java | 175 +++++++++++------- 1 file changed, 109 insertions(+), 66 deletions(-) diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index 1fbac959..5fecd5c4 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -38,7 +38,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.annotation.Order; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -46,6 +45,7 @@ import java.util.Optional; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; @@ -164,48 +164,63 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs(); - if (CollectionUtil.isNotEmpty(notifyConfigs)) { - for (Map.Entry> each : newDynamicThreadPoolNotifyMap.entrySet()) { - if (checkNotifyConfig) { - break; - } - List notifyConfigDTOS = notifyConfigs.get(each.getKey()); - for (NotifyConfigDTO notifyConfig : each.getValue()) { - if (!notifyConfigDTOS.contains(notifyConfig)) { - checkNotifyConfig = true; - break; - } - } - } + + boolean checkNotifyConfig = checkAndReplaceNotifyConfig(newDynamicThreadPoolNotifyMap, notifyConfigs); + boolean checkNotifyAlarm = checkAndReplaceNotifyAlarm(executorProperties); + + if (checkNotifyConfig || checkNotifyAlarm) { + log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); } - if (checkNotifyConfig) { - configModeNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); - threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); + } + + private boolean checkAndReplaceNotifyConfig(Map> newConfigs, + Map> currentConfigs) { + if (CollectionUtil.isEmpty(currentConfigs)) { + return false; } - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); - if (threadPoolNotifyAlarm != null) { - Boolean isAlarm = executorProperties.getAlarm(); - Integer activeAlarm = executorProperties.getActiveAlarm(); - Integer capacityAlarm = executorProperties.getCapacityAlarm(); - if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) - || (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm())) - || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { - checkNotifyAlarm = true; - threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm())); - threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm())); - threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm())); + + for (Map.Entry> entry : newConfigs.entrySet()) { + String key = entry.getKey(); + List newNotifyConfigList = entry.getValue(); + List currentNotifyConfigList = currentConfigs.get(key); + + if (currentNotifyConfigList == null || !currentNotifyConfigList.containsAll(newNotifyConfigList)) { + configModeNotifyConfigBuilder.initCacheAndLock(newConfigs); + threadPoolBaseSendMessageService.putPlatform(newConfigs); + return true; } } - if (checkNotifyConfig || checkNotifyAlarm) { - log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); + + return false; + } + + private boolean checkAndReplaceNotifyAlarm(ExecutorProperties executorProperties) { + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); + if (threadPoolNotifyAlarm == null) { + return false; + } + + boolean checkNotifyAlarm = false; + Boolean isAlarm = executorProperties.getAlarm(); + Integer activeAlarm = executorProperties.getActiveAlarm(); + Integer capacityAlarm = executorProperties.getCapacityAlarm(); + + if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) + || (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm())) + || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { + checkNotifyAlarm = true; + threadPoolNotifyAlarm.setAlarm(isAlarm != null ? isAlarm : threadPoolNotifyAlarm.getAlarm()); + threadPoolNotifyAlarm.setActiveAlarm(activeAlarm != null ? activeAlarm : threadPoolNotifyAlarm.getActiveAlarm()); + threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm != null ? capacityAlarm : threadPoolNotifyAlarm.getCapacityAlarm()); } + + return checkNotifyAlarm; } + /** * Check consistency. * @@ -216,19 +231,26 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); - queue.setCapacity(properties.getQueueCapacity()); - } else { - log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); - } + } + + private void updateQueueCapacity(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) { + if (hasPropertyChanged(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { + ((ResizableCapacityLinkedBlockingQueue) executor.getQueue()).setCapacity(properties.getQueueCapacity()); + } else { + log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); } } + + private boolean hasPropertyChanged(T before, T after) { + return after != null && !Objects.equals(before, after); + } }