From 241de5bccf0a9e58a2de039ab5738e76a9d86ef5 Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 19:22:51 +0800 Subject: [PATCH] Hystrix thread pool monitoring optimization --- .../adapter/base/ThreadPoolAdapterExtra.java | 29 +++---- .../hystrix/HystrixThreadPoolAdapter.java | 19 +---- .../starter/core/DiscoveryClient.java | 3 +- .../core/ThreadPoolAdapterRegister.java | 84 +++++++++++++++++-- 4 files changed, 93 insertions(+), 42 deletions(-) diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java index 695f2402..e12fbeaf 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java @@ -17,11 +17,14 @@ package cn.hippo4j.adapter.base; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -30,28 +33,18 @@ import java.util.concurrent.TimeUnit; @Slf4j public class ThreadPoolAdapterExtra { - private static final int BLOCKING_QUEUE_CAPACITY = 100; + private final ScheduledExecutorService scheduler; - private BlockingQueue> blockingQueue; public ThreadPoolAdapterExtra() { - blockingQueue = new ArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); + scheduler = new ScheduledThreadPoolExecutor(2, + new ThreadFactoryBuilder() + .setNameFormat("threadPoolAdapter") + .setDaemon(true) + .build()); } - public void offerQueue(Map map) throws InterruptedException { - blockingQueue.offer(map, 5, TimeUnit.SECONDS); - } - - public void extraStart(ThreadPoolAdapterExtraHandle threadPoolAdapterExtraHandle) { - new Thread(() -> { - try { - for (;;) { - Map map = blockingQueue.take(); - threadPoolAdapterExtraHandle.execute(map); - } - } catch (InterruptedException e) { - log.error("extraStart error", e); - } - }, "threadPoolAdapterExtra").start(); + public ScheduledExecutorService getScheduler() { + return scheduler; } } diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index 48fc670b..e0f53040 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -58,19 +58,12 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL private final Map HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); - private final ScheduledExecutorService scheduler; - private ThreadPoolAdapterExtra threadPoolAdapterExtra; public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { this.threadPoolAdapterExtra = threadPoolAdapterExtra; - scheduler = new ScheduledThreadPoolExecutor(2, - new ThreadFactoryBuilder() - .setNameFormat("hystrixThreadPoolAdapter") - .setDaemon(true) - .build()); } @Override @@ -120,13 +113,13 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public void onApplicationEvent(ApplicationStartedEvent event) { + ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler(); HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler); scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); } public void hystrixThreadPoolRefresh() { try { - boolean addExtraFlag = false; Class factoryClass = HystrixThreadPool.Factory.class; Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); threadPoolsField.setAccessible(true); @@ -144,18 +137,10 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL threadPoolField.setAccessible(true); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); - if (threadPoolExecutor != null && HYSTRIX_CONSUME_EXECUTOR.get(key) == null) { - HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor); - addExtraFlag = true; - } + HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor); } } } - if (addExtraFlag) { - Map map = Maps.newHashMap(); - map.putAll(ApplicationContextHolder.getBeansOfType(HystrixThreadPoolAdapter.class)); - threadPoolAdapterExtra.offerQueue(map); - } } catch (Exception e) { log.error("Failed to get Hystrix thread pool.", e); } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java index 2beea738..247ca276 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java @@ -143,8 +143,7 @@ public class DiscoveryClient implements DisposableBean { boolean success = register(); // TODO Abstract server registration logic ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); - Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); - adapterRegister.register(threadPoolAdapterMap); + adapterRegister.register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index 6a1e3b1a..b2c9b8fd 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -38,6 +38,9 @@ import org.springframework.core.env.ConfigurableEnvironment; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH; @@ -59,15 +62,24 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private final ThreadPoolAdapterExtra threadPoolAdapterExtra; + private List cacheConfigList = Lists.newArrayList(); + + private static final int TASK_INTERVAL_SECONDS = 2; + + + @Override public void run(ApplicationArguments args) throws Exception { - Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); - register(threadPoolAdapterMap); - threadPoolAdapterExtra.extraStart(map -> register(map)); - } - public void register(Map threadPoolAdapterMap) { + ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler(); + + ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler); + + scheduler.schedule(threadPoolAdapterRegisterTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + public List getThreadPoolAdapterCacheConfigs(){ + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); List cacheConfigList = Lists.newArrayList(); threadPoolAdapterMap.forEach((key, val) -> { List threadPoolStates = val.getThreadPoolStates(); @@ -84,6 +96,10 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { cacheConfig.setThreadPoolAdapterStates(threadPoolStates); cacheConfigList.add(cacheConfig); }); + return cacheConfigList; + } + + public void doRegister(List cacheConfigList){ if (CollectionUtil.isNotEmpty(cacheConfigList)) { try { Result result = httpAgent.httpPost(REGISTER_ADAPTER_PATH, cacheConfigList); @@ -95,4 +111,62 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { } } } + + public void register() { + List threadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(); + doRegister(threadPoolAdapterCacheConfigs); + } + + class ThreadPoolAdapterRegisterTask implements Runnable{ + + private ScheduledExecutorService scheduler; + + public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler){ + this.scheduler = scheduler; + } + + @Override + public void run() { + try { + boolean registerFlag = false; + + List newThreadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(); + + Map> newThreadPoolAdapterCacheConfigMap = + newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + + Map> oldThreadPoolAdapterCacheConfigMap = + cacheConfigList.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + + for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { + String key = entry.getKey(); + List newValue = entry.getValue(); + List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); + if (oldValue == null) { + registerFlag = true; + break; + }else { + if (newValue.size() != oldValue.size()) { + registerFlag = true; + break; + } + } + } + + cacheConfigList = newThreadPoolAdapterCacheConfigs; + + if (registerFlag) { + doRegister(cacheConfigList); + } + }catch (Exception e){ + log.error("Register Task Error",e); + }finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS); + } + } + } + } }