开发对接 Nacos 动态调参功能. #103

pull/117/head
chen.ma 2 years ago
parent 34e11f6ae8
commit a8fdfa649c

@ -0,0 +1,48 @@
package cn.hippo4j.core.starter.common;
/**
* Config file type enum.
*
* @author chen.ma
* @date 2022/2/26 18:12
*/
public enum ConfigFileTypeEnum {
/**
* PROPERTIES
*/
PROPERTIES {
@Override
public String type() {
return "properties";
}
},
/***
* YML
*/
YML {
@Override
public String type() {
return "yml";
}
},
/***
* YAML
*/
YAML {
@Override
public String type() {
return "yaml";
}
};
/**
* Type.
*
* @return
*/
public abstract String type();
}

@ -5,6 +5,7 @@ import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
import java.util.Map;
/**
* Bootstrap properties.
@ -20,27 +21,37 @@ public class BootstrapCoreProperties {
public static final String PREFIX = "spring.dynamic.thread-pool";
/**
* Enabled banner
* Enabled banner.
*/
private Boolean enableBanner;
/***
* Enabled collect
* Enabled collect.
*/
private Boolean enabledCollect;
/**
* Check state interval
* Check state interval.
*/
private String checkStateInterval;
/**
* Config file type.
*/
private String configFileType;
/**
* Nacos config.
*/
private Map<String, String> nacos;
/**
* Notify platforms.
*/
private List<NotifyPlatformProperties> notifyPlatforms;
/**
* Executors
* Executors.
*/
private List<ExecutorProperties> executors;

@ -1,7 +1,6 @@
package cn.hippo4j.core.starter.config;
import cn.hippo4j.common.api.NotifyConfigBuilder;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.notify.AlarmControlHandler;
import cn.hippo4j.common.notify.BaseSendMessageServiceImpl;
@ -13,11 +12,12 @@ import cn.hippo4j.common.notify.platform.WeChatSendMessageHandler;
import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.starter.refresher.CoreThreadPoolDynamicRefresh;
import cn.hippo4j.core.starter.refresher.ConfigParserHandler;
import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosRefresherHandler;
import cn.hippo4j.core.starter.support.DynamicThreadPoolPostProcessor;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.nacos.api.config.ConfigService;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@ -86,11 +86,6 @@ public class DynamicThreadPoolCoreAutoConfiguration {
return new WeChatSendMessageHandler();
}
@Bean
public ThreadPoolDynamicRefresh coreThreadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new CoreThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler);
}
@Bean
public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor(ApplicationContextHolder hippo4JApplicationContextHolder) {
return new DynamicThreadPoolPostProcessor(bootstrapCoreProperties);
@ -98,14 +93,25 @@ public class DynamicThreadPoolCoreAutoConfiguration {
@Bean
@ConditionalOnMissingClass(NACOS_KEY)
public NacosRefresherHandler nacosRefresherHandler() {
return new NacosRefresherHandler();
public NacosRefresherHandler nacosRefresherHandler(ConfigService configService,
ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
ConfigParserHandler configParserHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
return new NacosRefresherHandler(configService, threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties);
}
@Bean
@ConditionalOnClass(name = NACOS_KEY)
public NacosCloudRefresherHandler nacosCloudRefresherHandler(NacosConfigManager nacosConfigManager) {
return new NacosCloudRefresherHandler(nacosConfigManager);
public NacosCloudRefresherHandler nacosCloudRefresherHandler(NacosConfigManager nacosConfigManager,
ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
ConfigParserHandler configParserHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
return new NacosCloudRefresherHandler(nacosConfigManager, threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties);
}
@Bean
public ConfigParserHandler configParserHandler() {
return new ConfigParserHandler();
}
}

@ -0,0 +1,188 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.starter.config.ExecutorProperties;
import cn.hippo4j.core.starter.support.GlobalCoreThreadPoolManage;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.core.starter.config.BootstrapCoreProperties.PREFIX;
/**
* Abstract core thread pool dynamic refresh.
*
* @author chen.ma
* @date 2022/2/26 12:42
*/
@Slf4j
@AllArgsConstructor
public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh {
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
private final ConfigParserHandler configParserHandler;
protected final BootstrapCoreProperties bootstrapCoreProperties;
protected final ExecutorService dynamicRefreshExecutorService = ThreadPoolBuilder.builder()
.threadFactory("client.dynamic.refresh")
.singlePool()
.build();
@Override
public void dynamicRefresh(String content) {
Map<Object, Object> configInfo = configParserHandler.parseConfig(content, bootstrapCoreProperties.getConfigFileType());
ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo);
Binder binder = new Binder(sources);
BootstrapCoreProperties bindableCoreProperties = binder.bind(PREFIX, Bindable.ofInstance(bootstrapCoreProperties)).get();
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
if (!checkConsistency(threadPoolId, properties)) {
continue;
}
dynamicRefreshPool(threadPoolId, properties);
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ChangeParameterNotifyRequest changeRequest = new ChangeParameterNotifyRequest();
changeRequest.setBeforeCorePoolSize(beforeProperties.getCorePoolSize());
changeRequest.setBeforeMaximumPoolSize(beforeProperties.getMaximumPoolSize());
changeRequest.setBeforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut());
changeRequest.setBeforeKeepAliveTime(beforeProperties.getKeepAliveTime());
changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue());
changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity());
changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler());
changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
changeRequest.setNowCorePoolSize(properties.getCorePoolSize());
changeRequest.setNowMaximumPoolSize(properties.getMaximumPoolSize());
changeRequest.setNowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime());
changeRequest.setNowQueueCapacity(properties.getQueueCapacity());
changeRequest.setNowRejectedName(properties.getRejectedHandler());
GlobalCoreThreadPoolManage.refresh(threadPoolId, properties);
log.info(
"[🔥 {}] Changed thread pool. " +
"\n coreSize :: [{}]" +
"\n maxSize :: [{}]" +
"\n queueType :: [{}]" +
"\n capacity :: [{}]" +
"\n keepAliveTime :: [{}]" +
"\n rejectedType :: [{}]" +
"\n allowCoreThreadTimeOut :: [{}]",
threadPoolId.toUpperCase(),
String.format("%s => %s", beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format("%s => %s", beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format("%s => %s", beforeProperties.getBlockingQueue(), properties.getBlockingQueue()),
String.format("%s => %s", beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format("%s => %s", beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format("%s => %s", beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format("%s => %s", beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())
);
try {
threadPoolNotifyAlarmHandler.sendPoolConfigChange(changeRequest);
} catch (Throwable ex) {
log.error("Failed to send change notice. Message :: {}", ex.getMessage());
}
}
}
/**
* Check consistency.
*
* @param threadPoolId
* @param properties
*/
private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())
|| !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())
|| !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())
|| !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())
|| !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())
||
(
!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())
);
return result;
}
/**
* Dynamic refresh pool.
*
* @param threadPoolId
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (!Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())) {
executor.setCorePoolSize(properties.getCorePoolSize());
}
if (!Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
}
if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
}
}
}
}

@ -0,0 +1,44 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.common.toolkit.StringUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.core.io.ByteArrayResource;
import java.util.List;
import java.util.Map;
/**
* Config parser service.
*
* @author chen.ma
* @date 2022/2/26 17:33
*/
public class ConfigParserHandler {
private final List<String> yamlList = Lists.newArrayList("yaml", "yml");
/**
* Parse config.
*
* @param content
* @param configFileType
* @return
*/
public Map<Object, Object> parseConfig(String content, String configFileType) {
Map<Object, Object> resultMap = Maps.newHashMap();
if (StringUtil.isBlank(content)) {
return resultMap;
}
if (yamlList.contains(configFileType)) {
YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean();
yamlPropertiesFactoryBean.setResources(new ByteArrayResource(content.getBytes()));
resultMap = yamlPropertiesFactoryBean.getObject();
}
return resultMap;
}
}

@ -1,23 +0,0 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import lombok.AllArgsConstructor;
/**
* Core thread pool dynamic refresh.
*
* @author chen.ma
* @date 2022/2/26 12:32
*/
@AllArgsConstructor
public class CoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh {
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
@Override
public void dynamicRefresh(String content) {
}
}

@ -1,13 +1,14 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.nacos.api.config.listener.Listener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* Nacos cloud refresher handler.
@ -16,24 +17,32 @@ import java.util.concurrent.Executors;
* @date 2022/2/26 11:21
*/
@Slf4j
@AllArgsConstructor
public class NacosCloudRefresherHandler implements InitializingBean, Listener {
public class NacosCloudRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh implements InitializingBean, Listener {
private final NacosConfigManager nacosConfigManager;
public NacosCloudRefresherHandler(NacosConfigManager nacosConfigManager,
ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
ConfigParserHandler configParserHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
super(threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties);
this.nacosConfigManager = nacosConfigManager;
}
@Override
public void afterPropertiesSet() throws Exception {
nacosConfigManager.getConfigService().addListener("hippo4j-nacos.yaml", "DEFAULT_GROUP", this);
Map<String, String> nacosConfig = bootstrapCoreProperties.getNacos();
nacosConfigManager.getConfigService().addListener(nacosConfig.get("data-id"), nacosConfig.get("group"), this);
}
@Override
public Executor getExecutor() {
return Executors.newSingleThreadExecutor();
return dynamicRefreshExecutorService;
}
@Override
public void receiveConfigInfo(String configInfo) {
log.info("Config :: {}", configInfo);
dynamicRefresh(configInfo);
}
}

@ -1,13 +1,14 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* Nacos refresher handler.
@ -16,23 +17,32 @@ import java.util.concurrent.Executors;
* @date 2022/2/26 00:10
*/
@Slf4j
public class NacosRefresherHandler implements InitializingBean, Listener {
public class NacosRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh implements InitializingBean, Listener {
@Autowired(required = false)
private ConfigService configService;
private final ConfigService configService;
public NacosRefresherHandler(ConfigService configService,
ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
ConfigParserHandler configParserHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
super(threadPoolNotifyAlarmHandler, configParserHandler, bootstrapCoreProperties);
this.configService = configService;
}
@Override
public void afterPropertiesSet() throws Exception {
configService.addListener("hippo4j-nacos.yaml", "DEFAULT_GROUP", this);
Map<String, String> nacosConfig = bootstrapCoreProperties.getNacos();
configService.addListener(nacosConfig.get("data-id"), nacosConfig.get("group"), this);
}
@Override
public Executor getExecutor() {
return Executors.newSingleThreadExecutor();
return dynamicRefreshExecutorService;
}
@Override
public void receiveConfigInfo(String configInfo) {
log.info("Config :: {}", configInfo);
dynamicRefresh(configInfo);
}
}

@ -139,6 +139,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
}
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap);
GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties);
}
return newDynamicPoolExecutor;

@ -0,0 +1,48 @@
package cn.hippo4j.core.starter.support;
import cn.hippo4j.core.starter.config.ExecutorProperties;
import com.google.common.collect.Maps;
import java.util.Map;
/**
* Global core thread pool manage.
*
* @author chen.ma
* @date 2022/2/26 19:47
*/
public class GlobalCoreThreadPoolManage {
private static final Map<String, ExecutorProperties> EXECUTOR_PROPERTIES = Maps.newConcurrentMap();
/**
* Get properties.
*
* @param threadPoolId
* @return
*/
public static ExecutorProperties getProperties(String threadPoolId) {
return EXECUTOR_PROPERTIES.get(threadPoolId);
}
/**
* Register.
*
* @param threadPoolId
* @param executorProperties
*/
public static void register(String threadPoolId, ExecutorProperties executorProperties) {
EXECUTOR_PROPERTIES.put(threadPoolId, executorProperties);
}
/**
* Refresh.
*
* @param threadPoolId
* @param executorProperties
*/
public static void refresh(String threadPoolId, ExecutorProperties executorProperties) {
EXECUTOR_PROPERTIES.put(threadPoolId, executorProperties);
}
}
Loading…
Cancel
Save