From ca8f5a9f066e2fc4b03df81b200260c13f1b2d1e Mon Sep 17 00:00:00 2001 From: yangsanity Date: Fri, 10 Mar 2023 10:38:06 +0800 Subject: [PATCH] HystrixThreadPoolAdapter supports hippo4j config (#1086) --- .../AbstractHystrixThreadPoolAdapter.java | 163 ++++++++++++ .../hystrix/HystrixThreadPoolAdapter.java | 245 ------------------ .../HystrixThreadPoolAdapter4Config.java | 28 ++ .../HystrixThreadPoolAdapter4Server.java | 136 ++++++++++ .../DynamicThreadPoolAutoConfiguration.java | 6 + .../HystrixAdapterAutoConfiguration.java | 16 +- .../DynamicThreadPoolAutoConfiguration.java | 6 + 7 files changed, 351 insertions(+), 249 deletions(-) create mode 100644 hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java delete mode 100644 hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java create mode 100644 hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Config.java create mode 100644 hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Server.java diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java new file mode 100644 index 00000000..fb6ad924 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.hystrix; + +import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; +import com.netflix.hystrix.HystrixThreadPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.ApplicationListener; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + +/** + * abstract hystrix thread-pool adapter + */ +@Slf4j +public abstract class AbstractHystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { + + private static final String THREAD_POOL_FIELD = "threadPool"; + + private static final String THREAD_POOLS_FIELD = "threadPools"; + + private final Map hystrixConsumeExecutor = new HashMap<>(); + + protected final ThreadPoolAdapterScheduler threadPoolAdapterScheduler; + + protected AbstractHystrixThreadPoolAdapter(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + this.threadPoolAdapterScheduler = threadPoolAdapterScheduler; + } + + @Override + public String mark() { + return "Hystrix"; + } + + @Override + public ThreadPoolAdapterState getThreadPoolState(String identify) { + ThreadPoolAdapterState result = new ThreadPoolAdapterState(); + ThreadPoolExecutor threadPoolExecutor = hystrixConsumeExecutor.get(identify); + if (threadPoolExecutor != null) { + result.setThreadPoolKey(identify); + result.setCoreSize(threadPoolExecutor.getCorePoolSize()); + result.setMaximumSize(threadPoolExecutor.getMaximumPoolSize()); + return result; + } + log.warn("[{}] Hystrix thread pool not found.", identify); + return result; + } + + @Override + public List getThreadPoolStates() { + List threadPoolAdapterStates = new ArrayList<>(); + hystrixConsumeExecutor.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(kel))); + return threadPoolAdapterStates; + } + + @Override + public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + ThreadPoolExecutor threadPoolExecutor = hystrixConsumeExecutor.get(threadPoolKey); + if (threadPoolExecutor == null) { + log.warn("[{}] Hystrix thread pool not found.", threadPoolKey); + return false; + } + int originalCoreSize = threadPoolExecutor.getCorePoolSize(); + int originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); + ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); + log.info("[{}] Hystrix thread pool parameter change. coreSize: {}, maximumSize: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolExecutor.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolExecutor.getMaximumPoolSize())); + return true; + } + + @Override + public void onApplicationEvent(ApplicationStartedEvent event) { + ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); + int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); + // Periodically update the Hystrix thread pool. + HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler, taskIntervalSeconds); + scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); + } + + class HystrixThreadPoolRefreshTask implements Runnable { + + private final ScheduledExecutorService scheduler; + + private final int taskIntervalSeconds; + + HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler, int taskIntervalSeconds) { + this.scheduler = scheduler; + this.taskIntervalSeconds = taskIntervalSeconds; + } + + @Override + public void run() { + try { + hystrixThreadPoolRefresh(); + } finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); + } + } + } + + private void hystrixThreadPoolRefresh() { + try { + Class factoryClass = HystrixThreadPool.Factory.class; + Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); + threadPoolsField.setAccessible(true); + ConcurrentHashMap threadPools = + (ConcurrentHashMap) threadPoolsField.get(factoryClass); + if (CollectionUtil.isNotEmpty(threadPools)) { + for (Map.Entry stringHystrixThreadPoolEntry : threadPools.entrySet()) { + String key = stringHystrixThreadPoolEntry.getKey(); + HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue(); + if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) { + HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault = + (HystrixThreadPool.HystrixThreadPoolDefault) value; + Class hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass(); + Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD); + threadPoolField.setAccessible(true); + ThreadPoolExecutor threadPoolExecutor = + (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); + hystrixConsumeExecutor.put(key, threadPoolExecutor); + } + } + } + } catch (Exception ex) { + log.error("Failed to get Hystrix thread pool.", ex); + } + } + } +} diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java deleted file mode 100644 index 45d9bc56..00000000 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.adapter.hystrix; - -import cn.hippo4j.adapter.base.ThreadPoolAdapter; -import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; -import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; -import cn.hippo4j.adapter.base.ThreadPoolAdapterRegisterAction; -import cn.hippo4j.adapter.base.ThreadPoolAdapterState; -import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.common.toolkit.CollectionUtil; -import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; -import com.netflix.hystrix.HystrixThreadPool; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.context.event.ApplicationStartedEvent; -import org.springframework.context.ApplicationListener; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; - -/** - * hystrix thread-pool adapter. - */ -@Slf4j -public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { - - private static final String THREAD_POOL_FIELD = "threadPool"; - - private static final String THREAD_POOLS_FIELD = "threadPools"; - - private final Map hystrixConsumeExecutor = new HashMap<>(); - - private final ThreadPoolAdapterScheduler threadPoolAdapterScheduler; - - public HystrixThreadPoolAdapter(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { - this.threadPoolAdapterScheduler = threadPoolAdapterScheduler; - } - - @Override - public String mark() { - return "Hystrix"; - } - - @Override - public ThreadPoolAdapterState getThreadPoolState(String identify) { - ThreadPoolAdapterState result = new ThreadPoolAdapterState(); - ThreadPoolExecutor threadPoolExecutor = hystrixConsumeExecutor.get(identify); - if (threadPoolExecutor != null) { - result.setThreadPoolKey(identify); - result.setCoreSize(threadPoolExecutor.getCorePoolSize()); - result.setMaximumSize(threadPoolExecutor.getMaximumPoolSize()); - return result; - } - log.warn("[{}] Hystrix thread pool not found.", identify); - return result; - } - - @Override - public List getThreadPoolStates() { - List threadPoolAdapterStates = new ArrayList<>(); - hystrixConsumeExecutor.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(kel))); - return threadPoolAdapterStates; - } - - @Override - public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { - String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); - ThreadPoolExecutor threadPoolExecutor = hystrixConsumeExecutor.get(threadPoolKey); - if (threadPoolExecutor == null) { - log.warn("[{}] Hystrix thread pool not found.", threadPoolKey); - return false; - } - int originalCoreSize = threadPoolExecutor.getCorePoolSize(); - int originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); - ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); - log.info("[{}] Hystrix thread pool parameter change. coreSize: {}, maximumSize: {}", - threadPoolKey, - String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolExecutor.getCorePoolSize()), - String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolExecutor.getMaximumPoolSize())); - return true; - } - - @Override - public void onApplicationEvent(ApplicationStartedEvent event) { - ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); - int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); - // Periodically update the Hystrix thread pool. - HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler, taskIntervalSeconds); - scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); - // Periodically refresh registration. - ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction = ApplicationContextHolder.getBean(ThreadPoolAdapterRegisterAction.class); - Map beansOfType = ApplicationContextHolder.getBeansOfType(this.getClass()); - Map map = new HashMap<>(beansOfType); - ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler, taskIntervalSeconds, map, threadPoolAdapterRegisterAction); - scheduler.schedule(threadPoolAdapterRegisterTask, threadPoolAdapterScheduler.getTaskIntervalSeconds(), TimeUnit.SECONDS); - } - - public void hystrixThreadPoolRefresh() { - try { - Class factoryClass = HystrixThreadPool.Factory.class; - Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); - threadPoolsField.setAccessible(true); - ConcurrentHashMap threadPools = - (ConcurrentHashMap) threadPoolsField.get(factoryClass); - if (CollectionUtil.isNotEmpty(threadPools)) { - for (Map.Entry stringHystrixThreadPoolEntry : threadPools.entrySet()) { - String key = stringHystrixThreadPoolEntry.getKey(); - HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue(); - if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) { - HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault = - (HystrixThreadPool.HystrixThreadPoolDefault) value; - Class hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass(); - Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD); - threadPoolField.setAccessible(true); - ThreadPoolExecutor threadPoolExecutor = - (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); - hystrixConsumeExecutor.put(key, threadPoolExecutor); - } - } - } - } catch (Exception ex) { - log.error("Failed to get Hystrix thread pool.", ex); - } - } - - private boolean compareThreadPoolAdapterCacheConfigs(List newThreadPoolAdapterCacheConfigs, - List oldThreadPoolAdapterCacheConfigs) { - boolean registerFlag = false; - Map> newThreadPoolAdapterCacheConfigMap = - newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( - ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); - Map> oldThreadPoolAdapterCacheConfigMap = - oldThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( - ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); - for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { - String key = entry.getKey(); - List newValue = entry.getValue(); - List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); - if (oldValue == null) { - registerFlag = true; - break; - } else { - if (newValue.size() != oldValue.size()) { - registerFlag = true; - break; - } - } - } - return registerFlag; - } - - /** - * Hystrix Thread Pool Refresh Task - */ - class HystrixThreadPoolRefreshTask implements Runnable { - - private ScheduledExecutorService scheduler; - - private int taskIntervalSeconds; - - HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler, int taskIntervalSeconds) { - this.scheduler = scheduler; - this.taskIntervalSeconds = taskIntervalSeconds; - } - - @Override - public void run() { - try { - hystrixThreadPoolRefresh(); - } finally { - if (!scheduler.isShutdown()) { - scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); - } - } - } - } - - /** - * Thread Pool Adapter Register Task - */ - class ThreadPoolAdapterRegisterTask implements Runnable { - - private ScheduledExecutorService scheduler; - - private int taskIntervalSeconds; - - Map threadPoolAdapterMap; - - ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction; - - private List cacheConfigList = new ArrayList<>(); - - ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler, int taskIntervalSeconds, - Map threadPoolAdapterMap, - ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction) { - this.scheduler = scheduler; - this.taskIntervalSeconds = taskIntervalSeconds; - this.threadPoolAdapterMap = threadPoolAdapterMap; - this.threadPoolAdapterRegisterAction = threadPoolAdapterRegisterAction; - } - - @Override - public void run() { - try { - List newThreadPoolAdapterCacheConfigs = threadPoolAdapterRegisterAction.getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap); - boolean registerFlag = compareThreadPoolAdapterCacheConfigs(newThreadPoolAdapterCacheConfigs, cacheConfigList); - cacheConfigList = newThreadPoolAdapterCacheConfigs; - if (registerFlag) { - threadPoolAdapterRegisterAction.doRegister(cacheConfigList); - } - } catch (Exception ex) { - log.error("Register task error.", ex); - } finally { - if (!scheduler.isShutdown()) { - scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); - } - } - } - } -} diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Config.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Config.java new file mode 100644 index 00000000..5a06e167 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Config.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.hystrix; + +/** + * hystrix thread-pool adapter for hippo4j config. + */ +public class HystrixThreadPoolAdapter4Config extends AbstractHystrixThreadPoolAdapter { + + public HystrixThreadPoolAdapter4Config(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + super(threadPoolAdapterScheduler); + } +} diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Server.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Server.java new file mode 100644 index 00000000..0c8ec370 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter4Server.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.hystrix; + +import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterRegisterAction; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.toolkit.CollectionUtil; +import com.netflix.hystrix.HystrixThreadPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.ApplicationListener; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + +/** + * hystrix thread-pool adapter for hippo4j server. + */ +@Slf4j +public class HystrixThreadPoolAdapter4Server extends AbstractHystrixThreadPoolAdapter { + + public HystrixThreadPoolAdapter4Server(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + super(threadPoolAdapterScheduler); + } + + @Override + public void onApplicationEvent(ApplicationStartedEvent event) { + super.onApplicationEvent(event); + ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); + int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); + // Periodically refresh registration. + ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction = ApplicationContextHolder.getBean(ThreadPoolAdapterRegisterAction.class); + Map beansOfType = ApplicationContextHolder.getBeansOfType(this.getClass()); + Map map = new HashMap<>(beansOfType); + ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler, taskIntervalSeconds, map, threadPoolAdapterRegisterAction); + scheduler.schedule(threadPoolAdapterRegisterTask, threadPoolAdapterScheduler.getTaskIntervalSeconds(), TimeUnit.SECONDS); + } + + /** + * Thread Pool Adapter Register Task + */ + static class ThreadPoolAdapterRegisterTask implements Runnable { + + private final ScheduledExecutorService scheduler; + + private final int taskIntervalSeconds; + + Map threadPoolAdapterMap; + + ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction; + + private List cacheConfigList = new ArrayList<>(); + + ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler, int taskIntervalSeconds, + Map threadPoolAdapterMap, + ThreadPoolAdapterRegisterAction threadPoolAdapterRegisterAction) { + this.scheduler = scheduler; + this.taskIntervalSeconds = taskIntervalSeconds; + this.threadPoolAdapterMap = threadPoolAdapterMap; + this.threadPoolAdapterRegisterAction = threadPoolAdapterRegisterAction; + } + + @Override + public void run() { + try { + List newThreadPoolAdapterCacheConfigs = threadPoolAdapterRegisterAction.getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap); + boolean registerFlag = compareThreadPoolAdapterCacheConfigs(newThreadPoolAdapterCacheConfigs, cacheConfigList); + cacheConfigList = newThreadPoolAdapterCacheConfigs; + if (registerFlag) { + threadPoolAdapterRegisterAction.doRegister(cacheConfigList); + } + } catch (Exception ex) { + log.error("Register task error.", ex); + } finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); + } + } + } + + private boolean compareThreadPoolAdapterCacheConfigs(List newThreadPoolAdapterCacheConfigs, + List oldThreadPoolAdapterCacheConfigs) { + boolean registerFlag = false; + Map> newThreadPoolAdapterCacheConfigMap = + newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + Map> oldThreadPoolAdapterCacheConfigMap = + oldThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { + String key = entry.getKey(); + List newValue = entry.getValue(); + List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); + if (oldValue == null) { + registerFlag = true; + break; + } else { + if (newValue.size() != oldValue.size()) { + registerFlag = true; + break; + } + } + } + return registerFlag; + } + } +} diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 62467060..5b76bc9f 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -56,6 +56,12 @@ import org.springframework.core.annotation.Order; /** * Dynamic thread-pool auto-configuration. + * + *

NOTE: + * {@code cn.hippo4j.config.springboot.starter.config.DynamicThreadPoolAutoConfiguration} is used in the + * hippo4j-spring-boot-starter-adapter-hystrix module to determine the condition, see + * {@code cn.hippo4j.springboot.starter.adapter.hystrix.HystrixAdapterAutoConfiguration}, please + * note the subsequent modification. */ @Configuration @AllArgsConstructor diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java index d7e3e5a8..f8492a04 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java @@ -17,10 +17,11 @@ package cn.hippo4j.springboot.starter.adapter.hystrix; -import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; +import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter4Config; +import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter4Server; import cn.hippo4j.adapter.hystrix.ThreadPoolAdapterScheduler; import cn.hippo4j.common.config.ApplicationContextHolder; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -46,7 +47,14 @@ public class HystrixAdapterAutoConfiguration { } @Bean - public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { - return new HystrixThreadPoolAdapter(threadPoolAdapterScheduler); + @ConditionalOnClass(name = "cn.hippo4j.springboot.starter.config.DynamicThreadPoolAutoConfiguration") + public HystrixThreadPoolAdapter4Server hystrixThreadPoolAdapter4Server(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + return new HystrixThreadPoolAdapter4Server(threadPoolAdapterScheduler); + } + + @Bean + @ConditionalOnClass(name = "cn.hippo4j.config.springboot.starter.config.DynamicThreadPoolAutoConfiguration") + public HystrixThreadPoolAdapter4Config hystrixThreadPoolAdapter4Config(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + return new HystrixThreadPoolAdapter4Config(threadPoolAdapterScheduler); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 2650fb11..1b0cb995 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -78,6 +78,12 @@ import org.springframework.core.env.ConfigurableEnvironment; /** * Dynamic thread-pool auto-configuration. + * + *

NOTE: + * {@code cn.hippo4j.springboot.starter.config.DynamicThreadPoolAutoConfiguration} is used in the + * hippo4j-spring-boot-starter-adapter-hystrix module to determine the condition, see + * {@code cn.hippo4j.springboot.starter.adapter.hystrix.HystrixAdapterAutoConfiguration}, please + * note the subsequent modification. */ @Configuration @AllArgsConstructor