From a8fdfa649c03df52fabfdfcca809a8d38fe1603b Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sat, 26 Feb 2022 23:21:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=8F=91=E5=AF=B9=E6=8E=A5=20Nacos=20?= =?UTF-8?q?=E5=8A=A8=E6=80=81=E8=B0=83=E5=8F=82=E5=8A=9F=E8=83=BD.=20#103?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../starter/common/ConfigFileTypeEnum.java | 48 +++++ .../config/BootstrapCoreProperties.java | 19 +- ...ynamicThreadPoolCoreAutoConfiguration.java | 28 ++- .../AbstractCoreThreadPoolDynamicRefresh.java | 188 ++++++++++++++++++ .../refresher/ConfigParserHandler.java | 44 ++++ .../CoreThreadPoolDynamicRefresh.java | 23 --- .../refresher/NacosCloudRefresherHandler.java | 23 ++- .../refresher/NacosRefresherHandler.java | 26 ++- .../DynamicThreadPoolPostProcessor.java | 1 + .../support/GlobalCoreThreadPoolManage.java | 48 +++++ 10 files changed, 395 insertions(+), 53 deletions(-) create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/common/ConfigFileTypeEnum.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ConfigParserHandler.java delete mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/CoreThreadPoolDynamicRefresh.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/GlobalCoreThreadPoolManage.java diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/common/ConfigFileTypeEnum.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/common/ConfigFileTypeEnum.java new file mode 100644 index 00000000..8d228651 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/common/ConfigFileTypeEnum.java @@ -0,0 +1,48 @@ +package cn.hippo4j.core.starter.common; + +/** + * Config file type enum. + * + * @author chen.ma + * @date 2022/2/26 18:12 + */ +public enum ConfigFileTypeEnum { + + /** + * PROPERTIES + */ + PROPERTIES { + @Override + public String type() { + return "properties"; + } + }, + + /*** + * YML + */ + YML { + @Override + public String type() { + return "yml"; + } + }, + + /*** + * YAML + */ + YAML { + @Override + public String type() { + return "yaml"; + } + }; + + /** + * Type. + * + * @return + */ + public abstract String type(); + +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java index 6c2a7843..bd7233b5 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java @@ -5,6 +5,7 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.List; +import java.util.Map; /** * Bootstrap properties. @@ -20,27 +21,37 @@ public class BootstrapCoreProperties { public static final String PREFIX = "spring.dynamic.thread-pool"; /** - * Enabled banner + * Enabled banner. */ private Boolean enableBanner; /*** - * Enabled collect + * Enabled collect. */ private Boolean enabledCollect; /** - * Check state interval + * Check state interval. */ private String checkStateInterval; + /** + * Config file type. + */ + private String configFileType; + + /** + * Nacos config. + */ + private Map nacos; + /** * Notify platforms. */ private List notifyPlatforms; /** - * Executors + * Executors. */ private List executors; diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index b7513991..7cfe0162 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -1,7 +1,6 @@ package cn.hippo4j.core.starter.config; import cn.hippo4j.common.api.NotifyConfigBuilder; -import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.notify.AlarmControlHandler; import cn.hippo4j.common.notify.BaseSendMessageServiceImpl; @@ -13,11 +12,12 @@ import cn.hippo4j.common.notify.platform.WeChatSendMessageHandler; import cn.hippo4j.core.config.UtilAutoConfiguration; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder; -import cn.hippo4j.core.starter.refresher.CoreThreadPoolDynamicRefresh; +import cn.hippo4j.core.starter.refresher.ConfigParserHandler; import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler; import cn.hippo4j.core.starter.refresher.NacosRefresherHandler; import cn.hippo4j.core.starter.support.DynamicThreadPoolPostProcessor; import com.alibaba.cloud.nacos.NacosConfigManager; +import com.alibaba.nacos.api.config.ConfigService; import lombok.AllArgsConstructor; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -86,11 +86,6 @@ public class DynamicThreadPoolCoreAutoConfiguration { return new WeChatSendMessageHandler(); } - @Bean - public ThreadPoolDynamicRefresh coreThreadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) { - return new CoreThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler); - } - @Bean public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor(ApplicationContextHolder hippo4JApplicationContextHolder) { return new DynamicThreadPoolPostProcessor(bootstrapCoreProperties); @@ -98,14 +93,25 @@ public class DynamicThreadPoolCoreAutoConfiguration { @Bean @ConditionalOnMissingClass(NACOS_KEY) - public NacosRefresherHandler nacosRefresherHandler() { - return new NacosRefresherHandler(); + public NacosRefresherHandler nacosRefresherHandler(ConfigService configService, + ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, + ConfigParserHandler configParserHandler, + BootstrapCoreProperties bootstrapCoreProperties) { + return new NacosRefresherHandler(configService, threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties); } @Bean @ConditionalOnClass(name = NACOS_KEY) - public NacosCloudRefresherHandler nacosCloudRefresherHandler(NacosConfigManager nacosConfigManager) { - return new NacosCloudRefresherHandler(nacosConfigManager); + public NacosCloudRefresherHandler nacosCloudRefresherHandler(NacosConfigManager nacosConfigManager, + ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, + ConfigParserHandler configParserHandler, + BootstrapCoreProperties bootstrapCoreProperties) { + return new NacosCloudRefresherHandler(nacosConfigManager, threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties); + } + + @Bean + public ConfigParserHandler configParserHandler() { + return new ConfigParserHandler(); } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java new file mode 100644 index 00000000..03465998 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java @@ -0,0 +1,188 @@ +package cn.hippo4j.core.starter.refresher; + +import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; +import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.*; +import cn.hippo4j.core.proxy.RejectedProxyUtil; +import cn.hippo4j.core.starter.config.BootstrapCoreProperties; +import cn.hippo4j.core.starter.config.ExecutorProperties; +import cn.hippo4j.core.starter.support.GlobalCoreThreadPoolManage; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.context.properties.source.ConfigurationPropertySource; +import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static cn.hippo4j.core.starter.config.BootstrapCoreProperties.PREFIX; + +/** + * Abstract core thread pool dynamic refresh. + * + * @author chen.ma + * @date 2022/2/26 12:42 + */ +@Slf4j +@AllArgsConstructor +public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh { + + private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler; + + private final ConfigParserHandler configParserHandler; + + protected final BootstrapCoreProperties bootstrapCoreProperties; + + protected final ExecutorService dynamicRefreshExecutorService = ThreadPoolBuilder.builder() + .threadFactory("client.dynamic.refresh") + .singlePool() + .build(); + + @Override + public void dynamicRefresh(String content) { + Map configInfo = configParserHandler.parseConfig(content, bootstrapCoreProperties.getConfigFileType()); + + ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo); + Binder binder = new Binder(sources); + BootstrapCoreProperties bindableCoreProperties = binder.bind(PREFIX, Bindable.ofInstance(bootstrapCoreProperties)).get(); + + List executors = bindableCoreProperties.getExecutors(); + for (ExecutorProperties properties : executors) { + String threadPoolId = properties.getThreadPoolId(); + if (!checkConsistency(threadPoolId, properties)) { + continue; + } + + dynamicRefreshPool(threadPoolId, properties); + + ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); + ChangeParameterNotifyRequest changeRequest = new ChangeParameterNotifyRequest(); + changeRequest.setBeforeCorePoolSize(beforeProperties.getCorePoolSize()); + changeRequest.setBeforeMaximumPoolSize(beforeProperties.getMaximumPoolSize()); + changeRequest.setBeforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut()); + changeRequest.setBeforeKeepAliveTime(beforeProperties.getKeepAliveTime()); + changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue()); + changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity()); + changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler()); + changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId()); + + changeRequest.setNowCorePoolSize(properties.getCorePoolSize()); + changeRequest.setNowMaximumPoolSize(properties.getMaximumPoolSize()); + changeRequest.setNowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); + changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime()); + changeRequest.setNowQueueCapacity(properties.getQueueCapacity()); + changeRequest.setNowRejectedName(properties.getRejectedHandler()); + + GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); + log.info( + "[🔥 {}] Changed thread pool. " + + "\n coreSize :: [{}]" + + "\n maxSize :: [{}]" + + "\n queueType :: [{}]" + + "\n capacity :: [{}]" + + "\n keepAliveTime :: [{}]" + + "\n rejectedType :: [{}]" + + "\n allowCoreThreadTimeOut :: [{}]", + threadPoolId.toUpperCase(), + String.format("%s => %s", beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), + String.format("%s => %s", beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), + String.format("%s => %s", beforeProperties.getBlockingQueue(), properties.getBlockingQueue()), + String.format("%s => %s", beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), + String.format("%s => %s", beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), + String.format("%s => %s", beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), + String.format("%s => %s", beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()) + ); + + try { + threadPoolNotifyAlarmHandler.sendPoolConfigChange(changeRequest); + } catch (Throwable ex) { + log.error("Failed to send change notice. Message :: {}", ex.getMessage()); + } + } + } + + /** + * Check consistency. + * + * @param threadPoolId + * @param properties + */ + private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) { + ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); + ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); + + boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()) + || !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()) + || !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()) + || !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()) + || !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()) + || + ( + !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName()) + ); + + return result; + } + + /** + * Dynamic refresh pool. + * + * @param threadPoolId + * @param properties + */ + private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) { + ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); + + ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); + if (!Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())) { + executor.setCorePoolSize(properties.getCorePoolSize()); + } + + if (!Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())) { + executor.setMaximumPoolSize(properties.getMaximumPoolSize()); + } + + if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) { + executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); + } + + if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { + RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler()); + if (executor instanceof AbstractDynamicExecutorSupport) { + DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; + dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler); + AtomicLong rejectCount = dynamicExecutor.getRejectCount(); + rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); + } + + executor.setRejectedExecutionHandler(rejectedExecutionHandler); + } + + if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { + executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS); + } + + if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) { + if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) { + ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue(); + queue.setCapacity(properties.getQueueCapacity()); + } else { + log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName()); + } + } + } + +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ConfigParserHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ConfigParserHandler.java new file mode 100644 index 00000000..320579fe --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ConfigParserHandler.java @@ -0,0 +1,44 @@ +package cn.hippo4j.core.starter.refresher; + +import cn.hippo4j.common.toolkit.StringUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.springframework.beans.factory.config.YamlPropertiesFactoryBean; +import org.springframework.core.io.ByteArrayResource; + +import java.util.List; +import java.util.Map; + +/** + * Config parser service. + * + * @author chen.ma + * @date 2022/2/26 17:33 + */ +public class ConfigParserHandler { + + private final List yamlList = Lists.newArrayList("yaml", "yml"); + + /** + * Parse config. + * + * @param content + * @param configFileType + * @return + */ + public Map parseConfig(String content, String configFileType) { + Map resultMap = Maps.newHashMap(); + if (StringUtil.isBlank(content)) { + return resultMap; + } + + if (yamlList.contains(configFileType)) { + YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean(); + yamlPropertiesFactoryBean.setResources(new ByteArrayResource(content.getBytes())); + resultMap = yamlPropertiesFactoryBean.getObject(); + } + + return resultMap; + } + +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/CoreThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/CoreThreadPoolDynamicRefresh.java deleted file mode 100644 index 381c7213..00000000 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/CoreThreadPoolDynamicRefresh.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.hippo4j.core.starter.refresher; - -import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; -import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; -import lombok.AllArgsConstructor; - -/** - * Core thread pool dynamic refresh. - * - * @author chen.ma - * @date 2022/2/26 12:32 - */ -@AllArgsConstructor -public class CoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh { - - private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler; - - @Override - public void dynamicRefresh(String content) { - - } - -} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java index 390802ec..d226c632 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java @@ -1,13 +1,14 @@ package cn.hippo4j.core.starter.refresher; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.starter.config.BootstrapCoreProperties; import com.alibaba.cloud.nacos.NacosConfigManager; import com.alibaba.nacos.api.config.listener.Listener; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; +import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; /** * Nacos cloud refresher handler. @@ -16,24 +17,32 @@ import java.util.concurrent.Executors; * @date 2022/2/26 11:21 */ @Slf4j -@AllArgsConstructor -public class NacosCloudRefresherHandler implements InitializingBean, Listener { +public class NacosCloudRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh implements InitializingBean, Listener { private final NacosConfigManager nacosConfigManager; + public NacosCloudRefresherHandler(NacosConfigManager nacosConfigManager, + ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, + ConfigParserHandler configParserHandler, + BootstrapCoreProperties bootstrapCoreProperties) { + super(threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties); + this.nacosConfigManager = nacosConfigManager; + } + @Override public void afterPropertiesSet() throws Exception { - nacosConfigManager.getConfigService().addListener("hippo4j-nacos.yaml", "DEFAULT_GROUP", this); + Map nacosConfig = bootstrapCoreProperties.getNacos(); + nacosConfigManager.getConfigService().addListener(nacosConfig.get("data-id"), nacosConfig.get("group"), this); } @Override public Executor getExecutor() { - return Executors.newSingleThreadExecutor(); + return dynamicRefreshExecutorService; } @Override public void receiveConfigInfo(String configInfo) { - log.info("Config :: {}", configInfo); + dynamicRefresh(configInfo); } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java index e057dc2b..db4fe271 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java @@ -1,13 +1,14 @@ package cn.hippo4j.core.starter.refresher; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.starter.config.BootstrapCoreProperties; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; +import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; /** * Nacos refresher handler. @@ -16,23 +17,32 @@ import java.util.concurrent.Executors; * @date 2022/2/26 00:10 */ @Slf4j -public class NacosRefresherHandler implements InitializingBean, Listener { +public class NacosRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh implements InitializingBean, Listener { - @Autowired(required = false) - private ConfigService configService; + private final ConfigService configService; + + public NacosRefresherHandler(ConfigService configService, + ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, + ConfigParserHandler configParserHandler, + BootstrapCoreProperties bootstrapCoreProperties) { + super(threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties); + this.configService = configService; + } @Override public void afterPropertiesSet() throws Exception { - configService.addListener("hippo4j-nacos.yaml", "DEFAULT_GROUP", this); + Map nacosConfig = bootstrapCoreProperties.getNacos(); + configService.addListener(nacosConfig.get("data-id"), nacosConfig.get("group"), this); } @Override public Executor getExecutor() { - return Executors.newSingleThreadExecutor(); + return dynamicRefreshExecutorService; } @Override public void receiveConfigInfo(String configInfo) { - log.info("Config :: {}", configInfo); + dynamicRefresh(configInfo); } + } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java index 3fc3d7a3..58badff4 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java @@ -139,6 +139,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { } GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap); + GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties); } return newDynamicPoolExecutor; diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/GlobalCoreThreadPoolManage.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/GlobalCoreThreadPoolManage.java new file mode 100644 index 00000000..e388485c --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/GlobalCoreThreadPoolManage.java @@ -0,0 +1,48 @@ +package cn.hippo4j.core.starter.support; + +import cn.hippo4j.core.starter.config.ExecutorProperties; +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * Global core thread pool manage. + * + * @author chen.ma + * @date 2022/2/26 19:47 + */ +public class GlobalCoreThreadPoolManage { + + private static final Map EXECUTOR_PROPERTIES = Maps.newConcurrentMap(); + + /** + * Get properties. + * + * @param threadPoolId + * @return + */ + public static ExecutorProperties getProperties(String threadPoolId) { + return EXECUTOR_PROPERTIES.get(threadPoolId); + } + + /** + * Register. + * + * @param threadPoolId + * @param executorProperties + */ + public static void register(String threadPoolId, ExecutorProperties executorProperties) { + EXECUTOR_PROPERTIES.put(threadPoolId, executorProperties); + } + + /** + * Refresh. + * + * @param threadPoolId + * @param executorProperties + */ + public static void refresh(String threadPoolId, ExecutorProperties executorProperties) { + EXECUTOR_PROPERTIES.put(threadPoolId, executorProperties); + } + +}