From b40dfc48ef85d73ce42112f74bf0c3b9c6457922 Mon Sep 17 00:00:00 2001 From: lucky 8 <1031900093@qq.com> Date: Sun, 17 Jul 2022 17:10:24 +0800 Subject: [PATCH] Optimized the timing task logic of the adaptation module --- .../base/ThreadPoolAdapterRegisterAction.java | 14 +++- .../hystrix/HystrixThreadPoolAdapter.java | 84 +++++++++++++++++-- .../core/ThreadPoolAdapterRegister.java | 74 +--------------- 3 files changed, 90 insertions(+), 82 deletions(-) diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterRegisterAction.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterRegisterAction.java index c92123f6..42671409 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterRegisterAction.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterRegisterAction.java @@ -17,6 +17,7 @@ package cn.hippo4j.adapter.base; +import java.util.List; import java.util.Map; /** @@ -25,9 +26,18 @@ import java.util.Map; public interface ThreadPoolAdapterRegisterAction { /** - * adapterRegister + * getThreadPoolAdapterCacheConfigs + * * @param threadPoolAdapterMap + * @return List + */ + List getThreadPoolAdapterCacheConfigs(Map threadPoolAdapterMap); + + /** + * doRegister + * + * @param cacheConfigList * @return */ - void adapterRegister(Map threadPoolAdapterMap); + void doRegister(List cacheConfigList); } diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index 02eb00d1..2aae3a00 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -20,6 +20,7 @@ package cn.hippo4j.adapter.hystrix; import cn.hippo4j.adapter.base.*; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.netflix.hystrix.HystrixThreadPool; import lombok.extern.slf4j.Slf4j; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; @@ -102,20 +104,20 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public void onApplicationEvent(ApplicationStartedEvent event) { + ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); + int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); + // Periodically update the Hystrix thread pool - HystrixThreadPoolRefresh(); + HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler, taskIntervalSeconds); + scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); + // Periodically refresh registration ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction = ApplicationContextHolder.getBean(ThreadPoolAdapterRegisterAction.class); Map beansOfType = ApplicationContextHolder.getBeansOfType(this.getClass()); Map map = Maps.newHashMap(beansOfType); - threadPoolAdapterRegisterAction.adapterRegister(map); - } - public void HystrixThreadPoolRefresh() { - ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); - int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); - HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler, taskIntervalSeconds); - scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); + ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler, taskIntervalSeconds, map, threadPoolAdapterRegisterAction); + scheduler.schedule(threadPoolAdapterRegisterTask, threadPoolAdapterScheduler.getTaskIntervalSeconds(), TimeUnit.SECONDS); } public void hystrixThreadPoolRefresh() { @@ -146,6 +148,32 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL } } + private boolean compareThreadPoolAdapterCacheConfigs(List newThreadPoolAdapterCacheConfigs, + List oldThreadPoolAdapterCacheConfigs) { + boolean registerFlag = false; + Map> newThreadPoolAdapterCacheConfigMap = + newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + Map> oldThreadPoolAdapterCacheConfigMap = + oldThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { + String key = entry.getKey(); + List newValue = entry.getValue(); + List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); + if (oldValue == null) { + registerFlag = true; + break; + } else { + if (newValue.size() != oldValue.size()) { + registerFlag = true; + break; + } + } + } + return registerFlag; + } + class HystrixThreadPoolRefreshTask implements Runnable { private ScheduledExecutorService scheduler; @@ -168,4 +196,44 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL } } } + + class ThreadPoolAdapterRegisterTask implements Runnable { + + private ScheduledExecutorService scheduler; + + private int taskIntervalSeconds; + + Map threadPoolAdapterMap; + + ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction; + + private List cacheConfigList = Lists.newArrayList(); + + public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler, int taskIntervalSeconds, + Map threadPoolAdapterMap, + ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction) { + this.scheduler = scheduler; + this.taskIntervalSeconds = taskIntervalSeconds; + this.threadPoolAdapterMap = threadPoolAdapterMap; + this.threadPoolAdapterRegisterAction = threadPoolAdapterRegisterAction; + } + + @Override + public void run() { + try { + List newThreadPoolAdapterCacheConfigs = threadPoolAdapterRegisterAction.getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap); + boolean registerFlag = compareThreadPoolAdapterCacheConfigs(newThreadPoolAdapterCacheConfigs, cacheConfigList); + cacheConfigList = newThreadPoolAdapterCacheConfigs; + if (registerFlag) { + threadPoolAdapterRegisterAction.doRegister(cacheConfigList); + } + } catch (Exception e) { + log.error("Register Task Error", e); + } finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); + } + } + } + } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index 4de0a32a..1cbcc66f 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -58,13 +58,12 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA private final InetUtils hippo4JInetUtils; - private final ThreadPoolAdapterScheduler threadPoolAdapterScheduler; - @Override public void run(ApplicationArguments args) throws Exception { register(); } + @Override public List getThreadPoolAdapterCacheConfigs(Map threadPoolAdapterMap) { List cacheConfigList = Lists.newArrayList(); for (Map.Entry threadPoolAdapterEntry : threadPoolAdapterMap.entrySet()) { @@ -86,6 +85,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA return cacheConfigList; } + @Override public void doRegister(List cacheConfigList) { if (CollectionUtil.isNotEmpty(cacheConfigList) && cacheConfigList.size() > 0) { try { @@ -104,74 +104,4 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA List threadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap); doRegister(threadPoolAdapterCacheConfigs); } - - @Override - public void adapterRegister(Map threadPoolAdapterMap) { - ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); - int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); - ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler, taskIntervalSeconds, threadPoolAdapterMap); - scheduler.schedule(threadPoolAdapterRegisterTask, threadPoolAdapterScheduler.getTaskIntervalSeconds(), TimeUnit.SECONDS); - } - - class ThreadPoolAdapterRegisterTask implements Runnable { - - private ScheduledExecutorService scheduler; - - private int taskIntervalSeconds; - - Map threadPoolAdapterMap; - - private List cacheConfigList = Lists.newArrayList(); - - public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler, int taskIntervalSeconds, - Map threadPoolAdapterMap) { - this.scheduler = scheduler; - this.taskIntervalSeconds = taskIntervalSeconds; - this.threadPoolAdapterMap = threadPoolAdapterMap; - } - - @Override - public void run() { - try { - List newThreadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap); - boolean registerFlag = compareThreadPoolAdapterCacheConfigs(newThreadPoolAdapterCacheConfigs, cacheConfigList); - cacheConfigList = newThreadPoolAdapterCacheConfigs; - if (registerFlag) { - doRegister(cacheConfigList); - } - } catch (Exception e) { - log.error("Register Task Error", e); - } finally { - if (!scheduler.isShutdown()) { - scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); - } - } - } - } - - private boolean compareThreadPoolAdapterCacheConfigs(List newThreadPoolAdapterCacheConfigs, - List oldThreadPoolAdapterCacheConfigs) { - boolean registerFlag = false; - Map> newThreadPoolAdapterCacheConfigMap = - newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( - ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); - Map> oldThreadPoolAdapterCacheConfigMap = - oldThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( - ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); - for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { - String key = entry.getKey(); - List newValue = entry.getValue(); - List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); - if (oldValue == null) { - registerFlag = true; - break; - } else { - if (newValue.size() != oldValue.size()) { - registerFlag = true; - break; - } - } - } - return registerFlag; - } }