Refactor the code to build the kernel layer (#1322)

pull/1323/head
magestack 1 year ago committed by GitHub
parent 845c32fdca
commit 10b3767df1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,13 +20,20 @@ package cn.hippo4j.agent.core.util;
public interface ThreadPoolPropertyKey {
String THREAD_POOL_ID = "threadPoolId";
String CORE_POOL_SIZE = "corePoolSize";
String MAXIMUM_POOL_SIZE = "maximumPoolSize";
String ALLOW_CORE_THREAD_TIME_OUT = "allowCoreThreadTimeOut";
String KEEP_ALIVE_TIME = "keepAliveTime";
String BLOCKING_QUEUE = "blockingQueue";
String QUEUE_CAPACITY = "queueCapacity";
String THREAD_NAME_PREFIX = "threadNamePrefix";
String REJECTED_HANDLER = "rejectedHandler";
String EXECUTE_TIME_OUT = "executeTimeOut";
}

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-plugin</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>hippo4j-agent-adapter-plugins</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

@ -5,11 +5,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-plugin</artifactId>
<artifactId>hippo4j-agent-config-mode</artifactId>
<version>${revision}</version>
</parent>
<artifactId>apollo-plugin</artifactId>
<artifactId>hippo4j-agent-config-apollo-plugin</artifactId>
<properties>
<apollo.version>1.9.1</apollo.version>

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-mode</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-agent-config-mode</artifactId>
<packaging>pom</packaging>
<modules>
<module>apollo-plugin</module>
</modules>
</project>

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-plugin</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-agent-mode</artifactId>
<packaging>pom</packaging>
<modules>
<module>config</module>
</modules>
</project>

@ -13,8 +13,9 @@
<packaging>pom</packaging>
<modules>
<module>spring-plugins</module>
<module>thread-pool-plugin</module>
<module>apollo-plugin</module>
<module>threadpool-plugin</module>
<module>mode</module>
<module>adapter-plugins</module>
</modules>
<properties>

@ -9,11 +9,12 @@
<version>${revision}</version>
</parent>
<artifactId>spring-plugins</artifactId>
<artifactId>hippo4j-agent-spring-plugins</artifactId>
<packaging>pom</packaging>
<modules>
<module>spring-boot-1.x-plugin</module>
<module>spring-boot-2.x-plugin</module>
<module>spring-boot-1x-plugin</module>
<module>spring-boot-2x-plugin</module>
<module>spring-plugin-common</module>
</modules>
@ -26,10 +27,9 @@
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>spring-plugin-common</artifactId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

@ -5,11 +5,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>spring-plugins</artifactId>
<artifactId>hippo4j-agent-spring-plugins</artifactId>
<version>${revision}</version>
</parent>
<artifactId>spring-boot-1.x-plugin</artifactId>
<artifactId>hippo4j-agent-spring-boot-1x-plugin</artifactId>
<properties>
<spring.boot.version>1.5.22.RELEASE</spring.boot.version>
@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>spring-plugin-common</artifactId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<scope>provided</scope>
</dependency>

@ -17,15 +17,15 @@
package cn.hippo4j.agent.plugin.spring.boot.v1;
import cn.hippo4j.agent.plugin.spring.common.support.AbstractDynamicThreadPoolChangeHandlerSpring;
import cn.hippo4j.common.toolkit.MapUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.support.ResourceEditorRegistrar;
import org.springframework.boot.bind.CustomPropertyNamePatternsMatcher;
import org.springframework.boot.bind.RelaxedDataBinder;
import org.springframework.boot.bind.RelaxedNames;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
@ -34,18 +34,18 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapt.getNames;
import static cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapter.getNames;
/**
* Dynamic thread pool change handler spring 1x
*/
public class DynamicThreadPoolChangeHandlerSpring1x extends AbstractDynamicThreadPoolChangeHandlerSpring {
@RequiredArgsConstructor
public class DynamicThreadPoolChangeHandlerSpring1x extends AbstractConfigThreadPoolDynamicRefresh {
public DynamicThreadPoolChangeHandlerSpring1x(ConfigurableApplicationContext context) {
super(context);
}
private final ConfigurableApplicationContext applicationContext;
protected BootstrapConfigProperties bindProperties(Map<Object, Object> configInfo, ApplicationContext applicationContext) {
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableCoreProperties = new BootstrapConfigProperties();
if (MapUtil.isEmpty(configInfo)) {
return bindableCoreProperties;
@ -68,5 +68,4 @@ public class DynamicThreadPoolChangeHandlerSpring1x extends AbstractDynamicThrea
dataBinder.bind(propertyValues);
return bindableCoreProperties;
}
}

@ -5,11 +5,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>spring-plugins</artifactId>
<artifactId>hippo4j-agent-spring-plugins</artifactId>
<version>${revision}</version>
</parent>
<artifactId>spring-boot-2.x-plugin</artifactId>
<artifactId>hippo4j-agent-spring-boot-2x-plugin</artifactId>
<packaging>jar</packaging>
<properties>
@ -19,7 +19,7 @@
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>spring-plugin-common</artifactId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<scope>provided</scope>
</dependency>
@ -32,8 +32,9 @@
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-config-spring-boot-starter</artifactId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

@ -17,27 +17,22 @@
package cn.hippo4j.agent.plugin.spring.boot.v2;
import cn.hippo4j.agent.plugin.spring.common.support.AbstractDynamicThreadPoolChangeHandlerSpring;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import java.util.Map;
/**
* Dynamic thread pool change handler spring 2x
*/
public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractDynamicThreadPoolChangeHandlerSpring {
public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigThreadPoolDynamicRefresh {
public DynamicThreadPoolChangeHandlerSpring2x(ConfigurableApplicationContext context) {
super(context);
}
protected BootstrapConfigProperties bindProperties(Map<Object, Object> configInfo, ApplicationContext applicationContext) {
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableBootstrapConfigProperties = new BootstrapConfigProperties();
ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo);
Binder binder = new Binder(sources);

@ -51,9 +51,8 @@ public class EventPublishingStartedInterceptor implements InstanceMethodsAroundI
return ret;
}
SpringPropertiesLoader.loadSpringProperties(context.getEnvironment());
ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x(context);
ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x();
dynamicRefresh.registerListener();
return ret;
}

@ -5,11 +5,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>spring-plugins</artifactId>
<artifactId>hippo4j-agent-spring-plugins</artifactId>
<version>${revision}</version>
</parent>
<artifactId>spring-plugin-common</artifactId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<dependencies>
<dependency>
@ -17,44 +17,36 @@
<artifactId>spring-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<artifactId>hippo4j-threadpool-dynamic-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

@ -17,20 +17,8 @@
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.core.registry.AgentThreadPoolExecutorHolder;
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry;
import cn.hippo4j.agent.core.util.ThreadPoolPropertyKey;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
@ -38,13 +26,10 @@ import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
@ -55,14 +40,7 @@ public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements Th
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDynamicThreadPoolChangeHandlerSpring.class);
private final ConfigurableApplicationContext applicationContext;
public AbstractDynamicThreadPoolChangeHandlerSpring(ConfigurableApplicationContext context) {
this.applicationContext = context;
}
public void registerListener() {
List<String> apolloNamespaces = SpringBootConfig.Spring.Dynamic.Thread_Pool.Apollo.NAMESPACE;
String namespace = apolloNamespaces.get(0);
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
@ -78,179 +56,49 @@ public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements Th
String newValue = change.getNewValue();
newChangeValueMap.put(each, newValue);
});
dynamicRefresh(configFile.getContent(), newChangeValueMap, applicationContext);
dynamicRefresh(configFile.getContent(), newChangeValueMap);
};
config.addChangeListener(configChangeListener);
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace);
}
private void dynamicRefresh(String configContent, Map<String, Object> newValueChangeMap, ApplicationContext context) {
public void dynamicRefresh(String configContent, Map<String, Object> newValueChangeMap) {
try {
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
Map<Object, Object> afterConfigMap = ConfigParserHandler.getInstance().parseConfig(configContent,
ConfigFileTypeEnum.of(configFileType));
if (CollectionUtil.isNotEmpty(newValueChangeMap)) {
Optional.ofNullable(afterConfigMap).ifPresent(each -> each.putAll(newValueChangeMap));
}
// String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
//
// Map<Object, Object> afterConfigMap = ConfigParserHandler.getInstance().parseConfig(configContent,
// ConfigFileTypeEnum.of(configFileType));
// if (CollectionUtil.isNotEmpty(newValueChangeMap)) {
// Optional.ofNullable(afterConfigMap).ifPresent(each -> each.putAll(newValueChangeMap));
// }
// TODO
/*
* BootstrapConfigProperties afterConfigProperties = bindProperties(afterConfigMap, context);
*
* List<ExecutorProperties> executors = afterConfigProperties.getExecutors(); for (ExecutorProperties afterProperties : executors) { String threadPoolId =
* afterProperties.getThreadPoolId(); AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId); if (holder.isEmpty() ||
* holder.getExecutor() == null) { continue; } ExecutorProperties beforeProperties = convert(holder.getProperties());
*
* if (!checkConsistency(threadPoolId, beforeProperties, afterProperties)) { continue; }
*
* dynamicRefreshPool(beforeProperties, afterProperties); holder.setProperties(failDefaultExecutorProperties(beforeProperties, afterProperties)); // do refresh.
* ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, afterProperties); LOGGER.info(CHANGE_THREAD_POOL_TEXT, threadPoolId, String.format(CHANGE_DELIMITER,
* beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()),
* String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()), String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(),
* changeRequest.getNowKeepAliveTime()), String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()), String.format(CHANGE_DELIMITER,
* beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()), String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(),
* changeRequest.getNowAllowsCoreThreadTimeOut())); }
*/
// BootstrapConfigProperties afterConfigProperties = bindProperties(afterConfigMap, context);
//
// List<ExecutorProperties> executors = afterConfigProperties.getExecutors();
// for (ExecutorProperties afterProperties : executors) {
// String threadPoolId =
// afterProperties.getThreadPoolId();
// AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId);
// if (holder.isEmpty() ||
// holder.getExecutor() == null) {
// continue;
// }
// ExecutorProperties beforeProperties = convert(holder.getProperties());
// if (!checkConsistency(threadPoolId, beforeProperties, afterProperties)) {
// continue;
// }
// dynamicRefreshPool(beforeProperties, afterProperties);
// holder.setProperties(failDefaultExecutorProperties(beforeProperties, afterProperties)); // do refresh.
// ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, afterProperties);
// LOGGER.info(CHANGE_THREAD_POOL_TEXT, threadPoolId, String.format(CHANGE_DELIMITER,
// beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()),
// String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()), String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(),
// changeRequest.getNowKeepAliveTime()), String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()), String.format(CHANGE_DELIMITER,
// beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()), String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(),
// changeRequest.getNowAllowsCoreThreadTimeOut()));
// }
} catch (Exception ex) {
LOGGER.error("[Hippo4j-Agent] config mode dynamic refresh failed.", ex);
}
}
/**
* Dynamic refresh pool.
*/
private void dynamicRefreshPool(ExecutorProperties beforeProperties, ExecutorProperties afterProperties) {
AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(afterProperties.getThreadPoolId());
ThreadPoolExecutor executor = holder.getExecutor();
if (afterProperties.getMaximumPoolSize() != null && afterProperties.getCorePoolSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, afterProperties.getCorePoolSize(), afterProperties.getMaximumPoolSize());
} else {
if (afterProperties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(afterProperties.getMaximumPoolSize());
}
if (afterProperties.getCorePoolSize() != null) {
executor.setCorePoolSize(afterProperties.getCorePoolSize());
}
}
if (afterProperties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), afterProperties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(afterProperties.getAllowCoreThreadTimeOut());
}
if (afterProperties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), afterProperties.getExecuteTimeOut())) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(afterProperties.getExecuteTimeOut());
}
}
if (afterProperties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), afterProperties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(afterProperties.getRejectedHandler());
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (afterProperties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), afterProperties.getKeepAliveTime())) {
executor.setKeepAliveTime(afterProperties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (afterProperties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), afterProperties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();
queue.setCapacity(afterProperties.getQueueCapacity());
} else {
LOGGER.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}
}
/**
* Fail default executor properties.
*
* @param beforeProperties old properties
* @param afterProperties new properties
* @return executor properties
*/
private Properties failDefaultExecutorProperties(ExecutorProperties beforeProperties, ExecutorProperties afterProperties) {
return convert(ExecutorProperties.builder()
.corePoolSize(Optional.ofNullable(afterProperties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
.maximumPoolSize(Optional.ofNullable(afterProperties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
.blockingQueue(afterProperties.getBlockingQueue())
.queueCapacity(Optional.ofNullable(afterProperties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
.keepAliveTime(Optional.ofNullable(afterProperties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
.executeTimeOut(Optional.ofNullable(afterProperties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut()))
.rejectedHandler(Optional.ofNullable(afterProperties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(Optional.ofNullable(afterProperties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
.threadPoolId(beforeProperties.getThreadPoolId())
.build());
}
private ExecutorProperties convert(Properties properties) {
return ExecutorProperties.builder()
.threadPoolId((String) properties.get(ThreadPoolPropertyKey.THREAD_POOL_ID))
.corePoolSize((Integer) properties.get(ThreadPoolPropertyKey.CORE_POOL_SIZE))
.maximumPoolSize((Integer) properties.get(ThreadPoolPropertyKey.MAXIMUM_POOL_SIZE))
.allowCoreThreadTimeOut((Boolean) properties.get(ThreadPoolPropertyKey.ALLOW_CORE_THREAD_TIME_OUT))
.keepAliveTime((Long) properties.get(ThreadPoolPropertyKey.KEEP_ALIVE_TIME))
.blockingQueue((String) properties.get(ThreadPoolPropertyKey.BLOCKING_QUEUE))
.queueCapacity((Integer) properties.get(ThreadPoolPropertyKey.QUEUE_CAPACITY))
.threadNamePrefix((String) properties.get(ThreadPoolPropertyKey.THREAD_NAME_PREFIX))
.rejectedHandler((String) properties.get(ThreadPoolPropertyKey.REJECTED_HANDLER))
.executeTimeOut((Long) properties.get(ThreadPoolPropertyKey.EXECUTE_TIME_OUT))
.build();
}
private Properties convert(ExecutorProperties executorProperties) {
Properties properties = new Properties();
Optional.ofNullable(executorProperties.getCorePoolSize()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.CORE_POOL_SIZE, v));
Optional.ofNullable(executorProperties.getMaximumPoolSize()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.MAXIMUM_POOL_SIZE, v));
Optional.ofNullable(executorProperties.getBlockingQueue()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.BLOCKING_QUEUE, v));
Optional.ofNullable(executorProperties.getQueueCapacity()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.QUEUE_CAPACITY, v));
Optional.ofNullable(executorProperties.getKeepAliveTime()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.KEEP_ALIVE_TIME, v));
Optional.ofNullable(executorProperties.getExecuteTimeOut()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, v));
Optional.ofNullable(executorProperties.getRejectedHandler()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.REJECTED_HANDLER, v));
Optional.ofNullable(executorProperties.getAllowCoreThreadTimeOut()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.ALLOW_CORE_THREAD_TIME_OUT, v));
Optional.ofNullable(executorProperties.getThreadPoolId()).ifPresent(v -> properties.put(ThreadPoolPropertyKey.THREAD_POOL_ID, v));
return properties;
}
/**
* Construct change parameter notify request instance.
*
* @param beforeProperties old properties
* @param afterProperties new properties
* @return instance
*/
/*
* private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties afterProperties) { ChangeParameterNotifyRequest changeParameterNotifyRequest =
* ChangeParameterNotifyRequest.builder() .beforeCorePoolSize(beforeProperties.getCorePoolSize()) .beforeMaximumPoolSize(beforeProperties.getMaximumPoolSize())
* .beforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut()) .beforeKeepAliveTime(beforeProperties.getKeepAliveTime()) .beforeQueueCapacity(beforeProperties.getQueueCapacity())
* .beforeRejectedName(beforeProperties.getRejectedHandler()) .beforeExecuteTimeOut(beforeProperties.getExecuteTimeOut()) .blockingQueueName(afterProperties.getBlockingQueue())
* .nowCorePoolSize(Optional.ofNullable(afterProperties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
* .nowMaximumPoolSize(Optional.ofNullable(afterProperties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
* .nowAllowsCoreThreadTimeOut(Optional.ofNullable(afterProperties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
* .nowKeepAliveTime(Optional.ofNullable(afterProperties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
* .nowQueueCapacity(Optional.ofNullable(afterProperties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
* .nowRejectedName(Optional.ofNullable(afterProperties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
* .nowExecuteTimeOut(Optional.ofNullable(afterProperties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut())) .build();
* changeParameterNotifyRequest.setThreadPoolId(beforeProperties.getThreadPoolId()); return changeParameterNotifyRequest; }
*/
/**
* Check consistency.
*
* @param threadPoolId
* @param afterProperties
*/
private boolean checkConsistency(String threadPoolId, ExecutorProperties beforeProperties, ExecutorProperties afterProperties) {
AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId);
if (holder.isEmpty() || holder.getExecutor() == null) {
return false;
}
ThreadPoolExecutor executor = holder.getExecutor();
return (afterProperties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), afterProperties.getCorePoolSize()))
|| (afterProperties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), afterProperties.getMaximumPoolSize()))
|| (afterProperties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), afterProperties.getAllowCoreThreadTimeOut()))
|| (afterProperties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), afterProperties.getExecuteTimeOut()))
|| (afterProperties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), afterProperties.getKeepAliveTime()))
|| (afterProperties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), afterProperties.getRejectedHandler()))
||
((afterProperties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), afterProperties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())));
}
}

@ -17,14 +17,12 @@
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry;
import cn.hippo4j.agent.core.util.ReflectUtil;
import cn.hippo4j.agent.core.util.ThreadPoolPropertyKey;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.executor.ThreadPoolInstanceRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@ -32,10 +30,8 @@ import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Spring thread pool register support
@ -45,7 +41,7 @@ public class SpringThreadPoolRegisterSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class);
public static void registerThreadPoolInstances(ApplicationContext context) {
Map<ThreadPoolExecutor, Class<?>> earlyConstructMap = AgentThreadPoolInstanceRegistry.getInstance().earlyConstructMap;
Map<ThreadPoolExecutor, Class<?>> earlyConstructMap = ThreadPoolInstanceRegistry.getInstance().earlyConstructMap;
for (Map.Entry<ThreadPoolExecutor, Class<?>> entry : earlyConstructMap.entrySet()) {
ThreadPoolExecutor enhancedInstance = entry.getKey();
Class<?> declaredClass = entry.getValue();
@ -69,11 +65,12 @@ public class SpringThreadPoolRegisterSupport {
String beanName = entry.getKey();
Executor bean = entry.getValue();
ThreadPoolExecutor executor = null;
if (DynamicThreadPoolAdapterChoose.match(bean)) {
executor = DynamicThreadPoolAdapterChoose.unwrap(bean);
} else {
executor = (ThreadPoolExecutor) bean;
}
//
// if (DynamicThreadPoolAdapterChoose.match(bean)) {
// executor = DynamicThreadPoolAdapterChoose.unwrap(bean);
// } else {
// executor = (ThreadPoolExecutor) bean;
// }
if (executor == null) {
LOGGER.warn("[Hippo4j-Agent] Thread pool is null, ignore bean registration. beanName={}, beanClass={}", beanName, bean.getClass().getName());
} else {
@ -87,19 +84,15 @@ public class SpringThreadPoolRegisterSupport {
if (executor == null) {
return;
}
// build parameter properties.
Properties properties = new Properties();
properties.put(ThreadPoolPropertyKey.THREAD_POOL_ID, threadPoolId);
properties.put(ThreadPoolPropertyKey.CORE_POOL_SIZE, executor.getCorePoolSize());
properties.put(ThreadPoolPropertyKey.MAXIMUM_POOL_SIZE, executor.getMaximumPoolSize());
properties.put(ThreadPoolPropertyKey.ALLOW_CORE_THREAD_TIME_OUT, BooleanUtil.toBoolean(String.valueOf(executor.allowsCoreThreadTimeOut())));
properties.put(ThreadPoolPropertyKey.KEEP_ALIVE_TIME, executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
properties.put(ThreadPoolPropertyKey.BLOCKING_QUEUE, BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()).getName());
properties.put(ThreadPoolPropertyKey.QUEUE_CAPACITY, executor.getQueue().remainingCapacity());
properties.put(ThreadPoolPropertyKey.THREAD_NAME_PREFIX, threadPoolId);
properties.put(ThreadPoolPropertyKey.REJECTED_HANDLER, RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName());
properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, Constants.EXECUTE_TIME_OUT);
// register executor.
AgentThreadPoolInstanceRegistry.getInstance().putHolder(threadPoolId, executor, properties);
ExecutorProperties executorProperties = ExecutorProperties.builder()
.threadPoolId(threadPoolId)
.corePoolSize(executor.getCorePoolSize())
.maximumPoolSize(executor.getMaximumPoolSize())
.allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(executor.allowsCoreThreadTimeOut())))
.blockingQueue(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()).getName())
.queueCapacity(executor.getQueue().remainingCapacity())
.rejectedHandler(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName())
.build();
ThreadPoolInstanceRegistry.getInstance().putHolder(threadPoolId, executor, executorProperties);
}
}

@ -9,7 +9,7 @@
<version>${revision}</version>
</parent>
<artifactId>thread-pool-plugin</artifactId>
<artifactId>hippo4j-agent-threadpool-plugin</artifactId>
<dependencies>
<dependency>

@ -22,7 +22,7 @@ import cn.hippo4j.agent.core.logging.api.ILog;
import cn.hippo4j.agent.core.logging.api.LogManager;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry;
import cn.hippo4j.common.executor.ThreadPoolInstanceRegistry;
import cn.hippo4j.agent.core.util.CollectionUtil;
import cn.hippo4j.agent.core.util.StringUtil;
import java.util.ArrayList;
@ -52,7 +52,7 @@ public class ThreadPoolExecutorConstructorMethodInterceptor implements InstanceC
StackTraceElement declaredClassStackTraceElement = stackTraceElements.get(0);
String declaredClassName = declaredClassStackTraceElement.getClassName();
Class<?> declaredClass = Thread.currentThread().getContextClassLoader().loadClass(declaredClassName);
AgentThreadPoolInstanceRegistry.getInstance().earlyConstructMap.put((ThreadPoolExecutor) objInst, declaredClass);
ThreadPoolInstanceRegistry.getInstance().earlyConstructMap.put((ThreadPoolExecutor) objInst, declaredClass);
}
private List<StackTraceElement> getStackTraceElements() {

@ -15,35 +15,30 @@
* limitations under the License.
*/
package cn.hippo4j.agent.core.registry;
package cn.hippo4j.common.executor;
import cn.hippo4j.agent.core.logging.api.ILog;
import cn.hippo4j.agent.core.logging.api.LogManager;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
@Data
public class AgentThreadPoolExecutorHolder {
@NoArgsConstructor
public class ThreadPoolExecutorHolder {
private static final ILog LOGGER = LogManager.getLogger(AgentThreadPoolExecutorHolder.class);
public static final ThreadPoolExecutorHolder EMPTY = new ThreadPoolExecutorHolder();
public static final AgentThreadPoolExecutorHolder EMPTY = new AgentThreadPoolExecutorHolder();
private String executorName;
private String threadPoolId;
private ThreadPoolExecutor executor;
private Properties properties;
public AgentThreadPoolExecutorHolder() {
}
private ExecutorProperties executorProperties;
public AgentThreadPoolExecutorHolder(String executorName, ThreadPoolExecutor executor, Properties properties) {
this.executorName = executorName;
public ThreadPoolExecutorHolder(String threadPoolId, ThreadPoolExecutor executor, ExecutorProperties executorProperties) {
this.threadPoolId = threadPoolId;
this.executor = executor;
this.properties = properties;
this.executorProperties = executorProperties;
}
public boolean isEmpty() {

@ -15,51 +15,47 @@
* limitations under the License.
*/
package cn.hippo4j.agent.core.registry;
package cn.hippo4j.common.executor;
import cn.hippo4j.agent.core.logging.api.ILog;
import cn.hippo4j.agent.core.logging.api.LogManager;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
public class AgentThreadPoolInstanceRegistry {
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ThreadPoolInstanceRegistry {
private static final ILog LOGGER = LogManager.getLogger(AgentThreadPoolInstanceRegistry.class);
private final Map<String, AgentThreadPoolExecutorHolder> holderMap = new ConcurrentHashMap<>();
private final Map<String, ThreadPoolExecutorHolder> holderMap = new ConcurrentHashMap<>();
public final Map<ThreadPoolExecutor, Class<?>> earlyConstructMap = new ConcurrentHashMap<>();
private volatile static AgentThreadPoolInstanceRegistry INSTANCE;
private AgentThreadPoolInstanceRegistry() {
}
private volatile static ThreadPoolInstanceRegistry INSTANCE;
public static AgentThreadPoolInstanceRegistry getInstance() {
public static ThreadPoolInstanceRegistry getInstance() {
if (INSTANCE == null) {
synchronized (AgentThreadPoolInstanceRegistry.class) {
synchronized (ThreadPoolInstanceRegistry.class) {
if (INSTANCE == null) {
INSTANCE = new AgentThreadPoolInstanceRegistry();
INSTANCE = new ThreadPoolInstanceRegistry();
}
}
}
return INSTANCE;
}
public Map<String, AgentThreadPoolExecutorHolder> getHolderMap() {
public Map<String, ThreadPoolExecutorHolder> getHolderMap() {
return holderMap;
}
public void putHolder(String executorName, ThreadPoolExecutor executor, Properties properties) {
AgentThreadPoolExecutorHolder holder = new AgentThreadPoolExecutorHolder(executorName, executor, properties);
public void putHolder(String executorName, ThreadPoolExecutor executor, ExecutorProperties executorProperties) {
ThreadPoolExecutorHolder holder = new ThreadPoolExecutorHolder(executorName, executor, executorProperties);
holderMap.put(executorName, holder);
}
public AgentThreadPoolExecutorHolder getHolder(String executorName) {
return Optional.ofNullable(holderMap.get(executorName)).orElse(AgentThreadPoolExecutorHolder.EMPTY);
public ThreadPoolExecutorHolder getHolder(String executorName) {
return Optional.ofNullable(holderMap.get(executorName)).orElse(ThreadPoolExecutorHolder.EMPTY);
}
}

@ -20,7 +20,6 @@ package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import lombok.Getter;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;

@ -140,6 +140,11 @@ public class AbstractSubjectCenter {
/**
* Clear config cache.
*/
CLEAR_CONFIG_CACHE
CLEAR_CONFIG_CACHE,
/**
* Thread-pool dynamic refresh.
*/
THREAD_POOL_DYNAMIC_REFRESH
}
}

@ -24,8 +24,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get enable.
*
* @return
*/
default Boolean getEnable() {
return null;
@ -33,8 +31,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get username.
*
* @return
*/
default String getUsername() {
return null;
@ -42,8 +38,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get password.
*
* @return
*/
default String getPassword() {
return null;
@ -51,8 +45,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get namespace.
*
* @return
*/
default String getNamespace() {
return null;
@ -60,8 +52,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get item id.
*
* @return
*/
default String getItemId() {
return null;
@ -69,8 +59,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get server addr.
*
* @return
*/
default String getServerAddr() {
return null;
@ -78,8 +66,6 @@ public interface BootstrapPropertiesInterface {
/**
* Get banner.
*
* @return
*/
default Boolean getBanner() {
return null;

@ -30,6 +30,16 @@ public interface ThreadPoolDynamicRefresh {
default void registerListener() {
}
/**
* Build thread-pool bootstrap properties.
*
* @param configInfo changed configuration properties
* @return bootstrap properties
*/
default BootstrapPropertiesInterface buildBootstrapProperties(Map<Object, Object> configInfo) {
return null;
}
/**
* Dynamic refresh of configuration center data changes.
*

@ -10,4 +10,17 @@
</parent>
<artifactId>hippo4j-threadpool-dynamic-core</artifactId>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.support;
package cn.hippo4j.threadpool.dynamic.core.executor.manage;
import cn.hippo4j.common.model.executor.ExecutorProperties;
@ -24,8 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Global core thread-pool manage.
* TODO
*/
public class GlobalCoreThreadPoolManage {
public class GlobalConfigThreadPoolManage {
private static final Map<String, ExecutorProperties> EXECUTOR_PROPERTIES = new ConcurrentHashMap<>();

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.threadpool.dynamic.core.executor.manage;
import cn.hippo4j.common.model.ThreadPoolParameter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Global thread-pool manage.
*/
public class GlobalThreadPoolManage {
/**
* Dynamic thread pool parameter container.
*/
private static final Map<String, ThreadPoolParameter> POOL_PARAMETER = new ConcurrentHashMap();
/**
* Dynamic thread pool wrapper.
*/
private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap();
/**
* Get the dynamic thread pool class.
*
* @param threadPoolId thread-pool id
* @return dynamic thread-pool
*/
public static ThreadPoolExecutor getExecutorService(String threadPoolId) {
return EXECUTOR_MAP.get(threadPoolId);
}
/**
* Get dynamic thread pool parameters.
*
* @param threadPoolId thread-pool id
* @return thread-pool parameter
*/
public static ThreadPoolParameter getPoolParameter(String threadPoolId) {
return POOL_PARAMETER.get(threadPoolId);
}
/**
* Register dynamic thread pool parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolParameter thread-pool parameter
*/
public static void registerPoolParameter(String threadPoolId, ThreadPoolParameter threadPoolParameter) {
POOL_PARAMETER.put(threadPoolId, threadPoolParameter);
}
}

@ -17,5 +17,10 @@
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

@ -30,6 +30,10 @@ public class YamlConfigParser extends AbstractConfigParser {
@Override
public Map<Object, Object> doParse(String content) {
// TODO
/*
* if (StringUtils.isEmpty(content)) { return new HashMap<>(1); } YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean();
* yamlPropertiesFactoryBean.setResources(new ByteArrayResource(content.getBytes())); return yamlPropertiesFactoryBean.getObject();
*/
return null;
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.config;
package cn.hippo4j.threadpool.dynamic.mode.config.properties;
import lombok.Data;

@ -15,14 +15,13 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.config;
package cn.hippo4j.threadpool.dynamic.mode.config.properties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.parser.ConfigFileTypeEnum;
import cn.hippo4j.threadpool.dynamic.api.BootstrapPropertiesInterface;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.ArrayList;
import java.util.List;
@ -33,7 +32,6 @@ import java.util.Map;
*/
@Getter
@Setter
@ConfigurationProperties(prefix = BootstrapConfigProperties.PREFIX)
public class BootstrapConfigProperties implements BootstrapPropertiesInterface {
public static final String PREFIX = "spring.dynamic.thread-pool";
@ -79,7 +77,8 @@ public class BootstrapConfigProperties implements BootstrapPropertiesInterface {
private Map<String, String> etcd;
/**
* web config
* Web config
*
* @since 1.5.0
*/
private WebExecutorProperties web;

@ -15,10 +15,8 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.config;
package cn.hippo4j.threadpool.dynamic.mode.config.properties;
import cn.hippo4j.monitor.base.MonitorThreadPoolTypeEnum;
import cn.hippo4j.monitor.base.MonitorTypeEnum;
import lombok.Data;
/**
@ -35,12 +33,14 @@ public class MonitorProperties {
/**
* Type of collection thread pool running data. eg: log,micrometer. Multiple can be used at the same time, default micrometer.
*/
private String collectTypes = MonitorTypeEnum.MICROMETER.toString().toLowerCase();
// TODO
private String collectTypes = "micrometer";
/**
* Monitor the type of thread pool. eg: dynamic,web,adapter. Can be configured arbitrarily, default dynamic.
*/
private String threadPoolTypes = MonitorThreadPoolTypeEnum.DYNAMIC.toString().toLowerCase();
// TODO
private String threadPoolTypes = "dynamic";
/**
* Delay starting data acquisition task. unit: ms

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.config;
package cn.hippo4j.threadpool.dynamic.mode.config.properties;
import lombok.Data;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.config;
package cn.hippo4j.threadpool.dynamic.mode.config.properties;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.api.IExecutorProperties;

@ -17,7 +17,9 @@
package cn.hippo4j.threadpool.dynamic.mode.config.refresher;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.threadpool.dynamic.api.BootstrapPropertiesInterface;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler;
import lombok.extern.slf4j.Slf4j;
@ -43,8 +45,9 @@ public abstract class AbstractConfigThreadPoolDynamicRefresh implements ThreadPo
if (CollectionUtil.isNotEmpty(newValueChangeMap)) {
Optional.ofNullable(configInfo).ifPresent(each -> each.putAll(newValueChangeMap));
}
// BootstrapConfigProperties binderCoreProperties = bootstrapConfigPropertiesBinderAdapt.bootstrapCorePropertiesBinder(configInfo, bootstrapConfigProperties);
BootstrapPropertiesInterface bootstrapProperties = buildBootstrapProperties(configInfo);
// publishDynamicThreadPoolEvent(binderCoreProperties);
AbstractSubjectCenter.notify("", null);
} catch (Exception ex) {
log.error("Hippo4j config mode dynamic refresh failed.", ex);
}

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.threadpool.dynamic.mode.config.refresher.event;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.extension.design.Observer;
import cn.hippo4j.common.extension.design.ObserverMessage;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalConfigThreadPoolManage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
/**
* Dynamic thread-pool refresh listener.
*/
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolRefreshListener implements Observer<BootstrapConfigProperties> {
@Override
public void accept(ObserverMessage<BootstrapConfigProperties> observerMessage) {
BootstrapConfigProperties bindableConfigProperties = observerMessage.message();
List<ExecutorProperties> executors = bindableConfigProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
dynamicRefreshPool(threadPoolId, properties);
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId,
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
}
}
/**
* Dynamic refresh pool.
*
* @param threadPoolId
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
}
if (properties.getCorePoolSize() != null) {
executor.setCorePoolSize(properties.getCorePoolSize());
}
}
if (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
// TODO
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
// if (executor instanceof DynamicThreadPoolExecutor) {
// ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
// }
}
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}
}
}

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot1x.starter.config;
import cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapt;
import cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapter;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.BootstrapConfigPropertiesBinderAdapter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.bind.RelaxedDataBinder;
@ -31,6 +31,6 @@ public class ConfigHandlerAutoConfiguration {
@Bean
@ConditionalOnClass(RelaxedDataBinder.class)
public BootstrapConfigPropertiesBinderAdapter bootstrapConfigPropertiesBinderAdapter() {
return new SpringBoot1xBootstrapConfigPropertiesBinderAdapt();
return new SpringBoot1xBootstrapConfigPropertiesBinderAdapter();
}
}

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot1x.starter.refresher;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.api.BootstrapPropertiesInterface;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.BootstrapConfigPropertiesBinderAdapter;
import org.springframework.beans.BeanUtils;
@ -41,7 +41,7 @@ import java.util.Set;
/**
* Bootstrap core properties binder adapt.
*/
public class SpringBoot1xBootstrapConfigPropertiesBinderAdapt implements ApplicationContextAware, BootstrapConfigPropertiesBinderAdapter {
public class SpringBoot1xBootstrapConfigPropertiesBinderAdapter implements ApplicationContextAware, BootstrapConfigPropertiesBinderAdapter {
private ApplicationContext applicationContext;

@ -18,6 +18,7 @@
package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.config.springboot.starter.refresher.*;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.BootstrapConfigPropertiesBinderAdapter;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.nacos.api.config.ConfigService;
@ -58,7 +59,7 @@ public class ConfigHandlerConfiguration {
@Bean
@ConditionalOnMissingBean
public BootstrapConfigPropertiesBinderAdapter bootstrapConfigPropertiesBinderAdapter() {
return new DefaultBootstrapConfigPropertiesBinderAdapt();
return new DefaultBootstrapConfigPropertiesBinderAdapter();
}
/**

@ -42,6 +42,7 @@ import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.WebThreadPoolConfigChangeHandler;
import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@ -69,7 +70,7 @@ import org.springframework.core.annotation.Order;
@AllArgsConstructor
@ConditionalOnBean(MarkerConfiguration.Marker.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@EnableConfigurationProperties(BootstrapConfigProperties.class)
@EnableConfigurationProperties(SpringBootstrapConfigProperties.class)
@Import(ConfigHandlerConfiguration.class)
@ImportAutoConfiguration({WebAdapterConfiguration.class, UtilAutoConfiguration.class, MessageConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {

@ -15,15 +15,14 @@
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.parser;
package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Abstract config parser
* Spring bootstrap core properties.
*/
public abstract class AbstractConfigParser implements ConfigParser {
@Override
public boolean supports(ConfigFileTypeEnum type) {
return getConfigFileTypes().contains(type);
}
@ConfigurationProperties(prefix = BootstrapConfigProperties.PREFIX)
public class SpringBootstrapConfigProperties extends BootstrapConfigProperties {
}

@ -29,8 +29,8 @@ import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.common.executor.ThreadFactoryBuilder;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.MonitorProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.monitor.base.DynamicThreadPoolMonitor;
import cn.hippo4j.monitor.base.ThreadPoolMonitor;

@ -23,9 +23,9 @@ import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.NotifyPlatformProperties;
import cn.hippo4j.config.springboot.starter.config.WebExecutorProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.NotifyPlatformProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.WebExecutorProperties;
import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.service.AlarmControlHandler;

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.parser;
import lombok.Getter;
/**
* Config file type enum
*/
@Getter
public enum ConfigFileTypeEnum {
/**
* PROPERTIES
*/
PROPERTIES("properties"),
/**
* XML
*/
XML("xml"),
/**
* JSON
*/
JSON("json"),
/**
* YML
*/
YML("yml"),
/**
* YAML
*/
YAML("yaml"),
/**
* TXT
*/
TXT("txt");
private final String value;
ConfigFileTypeEnum(String value) {
this.value = value;
}
public static ConfigFileTypeEnum of(String value) {
for (ConfigFileTypeEnum typeEnum : ConfigFileTypeEnum.values()) {
if (typeEnum.value.equals(value)) {
return typeEnum;
}
}
return PROPERTIES;
}
}

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.parser;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* Config parser.
*/
public interface ConfigParser {
/**
* Supports.
*
* @param type
* @return
*/
boolean supports(ConfigFileTypeEnum type);
/**
* Do parse.
*
* @param content
* @return
* @throws IOException
*/
Map<Object, Object> doParse(String content) throws IOException;
/**
* Get config file types.
*
* @return
*/
List<ConfigFileTypeEnum> getConfigFileTypes();
}

@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.parser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
/**
* Config parser handler.
*/
public final class ConfigParserHandler {
private static final List<ConfigParser> PARSERS = new ArrayList<>();
private ConfigParserHandler() {
ServiceLoader<ConfigParser> loader = ServiceLoader.load(ConfigParser.class);
for (ConfigParser configParser : loader) {
PARSERS.add(configParser);
}
PARSERS.add(new PropertiesConfigParser());
PARSERS.add(new YamlConfigParser());
}
public Map<Object, Object> parseConfig(String content, ConfigFileTypeEnum type) throws IOException {
for (ConfigParser parser : PARSERS) {
if (parser.supports(type)) {
return parser.doParse(content);
}
}
return Collections.emptyMap();
}
public static ConfigParserHandler getInstance() {
return ConfigParserHandlerHolder.INSTANCE;
}
/**
* Config Parser Handler Holder
*/
private static class ConfigParserHandlerHolder {
private static final ConfigParserHandler INSTANCE = new ConfigParserHandler();
}
}

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.parser;
import cn.hippo4j.common.toolkit.CollectionUtil;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Properties config parser.
*/
public class PropertiesConfigParser extends AbstractConfigParser {
@Override
public Map<Object, Object> doParse(String content) throws IOException {
Properties properties = new Properties();
properties.load(new StringReader(content));
return properties;
}
@Override
public List<ConfigFileTypeEnum> getConfigFileTypes() {
return CollectionUtil.newArrayList(ConfigFileTypeEnum.PROPERTIES);
}
}

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.parser;
import cn.hippo4j.common.toolkit.CollectionUtil;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Yaml config parser.
*/
public class YamlConfigParser extends AbstractConfigParser {
@Override
public Map<Object, Object> doParse(String content) {
if (StringUtils.isEmpty(content)) {
return new HashMap<>(1);
}
YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean();
yamlPropertiesFactoryBean.setResources(new ByteArrayResource(content.getBytes()));
return yamlPropertiesFactoryBean.getObject();
}
@Override
public List<ConfigFileTypeEnum> getConfigFileTypes() {
return CollectionUtil.newArrayList(ConfigFileTypeEnum.YML, ConfigFileTypeEnum.YAML);
}
}

@ -18,13 +18,13 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.parser.ConfigParserHandler;
import cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfigDynamicRefreshEvent;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.threadpool.dynamic.api.BootstrapPropertiesInterface;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.BootstrapConfigPropertiesBinderAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.api.BootstrapPropertiesInterface;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.BootstrapConfigPropertiesBinderAdapter;
import org.springframework.boot.context.properties.bind.Bindable;
@ -30,7 +30,7 @@ import java.util.Map;
/**
* Bootstrap core properties binder adapt.
*/
public class DefaultBootstrapConfigPropertiesBinderAdapt implements BootstrapConfigPropertiesBinderAdapter {
public class DefaultBootstrapConfigPropertiesBinderAdapter implements BootstrapConfigPropertiesBinderAdapter {
/**
* Bootstrap core properties binder.

@ -19,7 +19,7 @@ package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.core.config.ApplicationContextHolder;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.nacos.api.config.ConfigService;

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import com.alibaba.nacos.api.annotation.NacosInjected;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.extern.slf4j.Slf4j;

@ -23,7 +23,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.config.springboot.starter.config.AdapterExecutorProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.AdapterExecutorProperties;
import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolAdapterRegister;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@ -23,10 +23,10 @@ import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
import cn.hippo4j.config.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalConfigThreadPoolManage;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.dto.NotifyConfigDTO;
@ -87,8 +87,8 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
continue;
}
dynamicRefreshPool(threadPoolId, properties);
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
GlobalCoreThreadPoolManage.refresh(threadPoolId, failDefaultExecutorProperties(beforeProperties, properties));
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
GlobalConfigThreadPoolManage.refresh(threadPoolId, failDefaultExecutorProperties(beforeProperties, properties));
ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, properties);
log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId,
@ -215,7 +215,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
* @param properties
*/
private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutor(threadPoolId);
if (executor == null) {
return false;
@ -239,7 +239,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());

@ -17,7 +17,7 @@
package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;

@ -18,7 +18,7 @@
package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;

@ -23,8 +23,8 @@ import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.WebExecutorProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.WebExecutorProperties;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;

@ -17,8 +17,8 @@
package cn.hippo4j.config.springboot.starter.support;
import cn.hippo4j.config.springboot.starter.config.AdapterExecutorProperties;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.AdapterExecutorProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;

@ -29,6 +29,7 @@ import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalConfigThreadPoolManage;
import java.util.concurrent.ThreadPoolExecutor;
@ -48,7 +49,7 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
.build();
GlobalThreadPoolManage.registerPool(threadPoolId, dynamicThreadPoolWrapper);
ExecutorProperties executorProperties = buildExecutorProperties(registerWrapper);
GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties);
GlobalConfigThreadPoolManage.register(threadPoolId, executorProperties);
DynamicThreadPoolRegisterCoreNotifyParameter notifyParameter = registerWrapper.getConfigNotify();
ThreadPoolNotifyAlarm notifyAlarm = new ThreadPoolNotifyAlarm(true, registerParameter.getActiveAlarm(), registerParameter.getCapacityAlarm());
notifyAlarm.setReceives(notifyParameter.getReceives());

@ -23,7 +23,7 @@ import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.core.executor.DynamicThreadPool;
@ -35,6 +35,7 @@ import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalConfigThreadPoolManage;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@ -85,11 +86,12 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean;
}
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
ThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
if (dynamicThreadPoolExecutor == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
// TODO
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;
@ -128,7 +130,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
}
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrapper.getThreadPoolId(), dynamicThreadPoolWrapper);
GlobalCoreThreadPoolManage.register(
GlobalConfigThreadPoolManage.register(
threadPoolId,
executorProperties == null
? buildDefaultExecutorProperties(threadPoolId, executor)

@ -71,7 +71,7 @@ public class AdaptedThreadPoolDestroyPostProcessor implements DestructionAwareBe
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
Optional.ofNullable(DynamicThreadPoolAdapterChoose.unwrap(bean))
.map(DynamicThreadPoolExecutor::getThreadPoolId)
.map(each -> ((DynamicThreadPoolExecutor) each).getThreadPoolId())
// the internal thread pool is also managed by spring, no manual destruction required
.filter(applicationContext::containsBeanDefinition)
.map(GlobalThreadPoolManage::getExecutorService)

@ -96,12 +96,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean;
}
DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
ThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
if ((dynamicThreadPoolExecutor) == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
subscribeConfig(dynamicThreadPoolWrapper);

@ -66,7 +66,7 @@ public class AbstractBuildThreadPoolTemplate {
Assert.notNull(initParam);
ThreadPoolExecutor executorService;
try {
executorService = new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(),
executorService = new ThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaximumPoolSize(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
@ -80,41 +80,6 @@ public class AbstractBuildThreadPoolTemplate {
return executorService;
}
/**
* Build a fast-consuming task thread pool.
*
* @return fast thread-pool executor
*/
public static ThreadPoolExecutor buildFastPool() {
ThreadPoolInitParam initParam = initParam();
return buildFastPool(initParam);
}
/**
* Build a fast-consuming task thread pool.
*
* @param initParam init param
* @return fast thread-pool executor
*/
public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) {
TaskQueue<Runnable> taskQueue = new TaskQueue(initParam.getCapacity());
FastThreadPoolExecutor fastThreadPoolExecutor;
try {
fastThreadPoolExecutor = new FastThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaximumPoolSize(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
taskQueue,
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("Error creating thread pool parameter.", ex);
}
taskQueue.setExecutor(fastThreadPoolExecutor);
fastThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut);
return fastThreadPoolExecutor;
}
/**
* Build a dynamic monitor thread-pool.
*

@ -1,177 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
import cn.hippo4j.core.executor.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Dynamic executor configuration support.
*
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin} to get thread-pool shutdown support
*/
@Deprecated
@Slf4j
public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean {
private String threadPoolId;
private ExecutorService executor;
private long awaitTerminationMillis;
private boolean waitForTasksToCompleteOnShutdown;
public AbstractDynamicExecutorSupport(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
BlockingQueue<Runnable> workQueue,
String threadPoolId,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.threadPoolId = threadPoolId;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
this.awaitTerminationMillis = awaitTerminationMillis;
}
/**
* Create the target {@link java.util.concurrent.ExecutorService} instance.
* Called by {@code afterPropertiesSet}.
*
* @return a new ExecutorService instance
* @see #afterPropertiesSet()
*/
protected abstract ExecutorService initializeExecutor();
/**
* Calls {@code initialize()} after the container applied all property values.
*
* @see #initialize()
*/
@Override
public void afterPropertiesSet() {
initialize();
}
/**
* Calls {@code shutdown} when the BeanFactory destroys.
* the task executor instance.
*
* @see #shutdown()
*/
@Override
public void destroy() {
shutdownSupport();
}
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (log.isInfoEnabled()) {
log.info("Initializing ExecutorService" + (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : ""));
}
this.executor = initializeExecutor();
}
/**
* Set support param.
*
* @param awaitTerminationMillis
* @param waitForTasksToCompleteOnShutdown
*/
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
this.awaitTerminationMillis = awaitTerminationMillis;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
}
/**
* Perform a shutdown on the underlying ExecutorService.
*
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public void shutdownSupport() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService" + (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
} else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
/**
* Cancel the given remaining task which never commended execution,
* as returned from {@link ExecutorService#shutdownNow()}.
*
* @param task the task to cancel (typically a {@link RunnableFuture})
* @see #shutdown()
* @see RunnableFuture#cancel(boolean)
* @since 5.0.5
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
/**
* Wait for the executor to terminate, according to the value of the.
*/
private void awaitTerminationIfNecessary(ExecutorService executor) {
if (this.awaitTerminationMillis > 0) {
try {
if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
if (log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor"
+ (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
}
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor"
+ (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
}
Thread.currentThread().interrupt();
}
}
}
}

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Fast thread-pool executor.
*/
@Slf4j
public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
public FastThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* Statistics on the number of tasks submitted by the fast consumption thread pool
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
/**
* Get submitted task count.
*
* @return submitted task count
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("The blocking queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Exception t) {
submittedTaskCount.decrementAndGet();
throw t;
}
}
}

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
import lombok.Setter;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* Task queue.
*/
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
@Setter
private FastThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
@Override
public boolean offer(Runnable runnable) {
int currentPoolThreadSize = executor.getPoolSize();
// If a core thread is idle, add the task to the blocking queue, and the core thread will process the task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// The current number of threads in the thread pool is less than the maximum number of threads, and returns false.
// According to the thread pool source code, non-core threads will be created.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// If the current thread pool number is greater than the maximum number of threads, the task is added to the blocking queue.
return super.offer(runnable);
}
/**
* Retry offer.
*
* @param runnable submit thread pool task
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return
* @throws InterruptedException
*/
public boolean retryOffer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Actuator closed!");
}
return super.offer(runnable, timeout, unit);
}
}

@ -411,16 +411,6 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder));
}
/**
* Build a fast thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return fast thread-pool executor
*/
private static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder));
}
/**
* Build a dynamic thread-pool with {@code builder}.
*
@ -474,9 +464,6 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
@Override
public ThreadPoolExecutor build() {
if (isDynamicPool) {
return buildDynamicPool(this);
}
return isFastPool ? buildFastPool(this) : buildPool(this);
return isDynamicPool ? buildDynamicPool(this) : buildPool(this);
}
}

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.toolkit.ArrayUtil;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadPool executor template.
*/
public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
public ThreadPoolExecutorTemplate(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(final Runnable command) {
super.execute(wrap(command, clientTrace()));
}
@Override
public Future<?> submit(final Runnable task) {
return super.submit(wrap(task, clientTrace()));
}
@Override
public <T> Future<T> submit(final Callable<T> task) {
return super.submit(wrap(task, clientTrace()));
}
/**
* Client trace.
*
* @return exception
*/
private Exception clientTrace() {
return new Exception("Tread task root stack trace.");
}
/**
* Wrapping thread pool tasks.
*
* @param task task
* @param clientStack client stack
* @return wrapped runnable
*/
private Runnable wrap(final Runnable task, final Exception clientStack) {
return () -> {
try {
task.run();
} catch (Exception e) {
e.setStackTrace(ArrayUtil.addAll(clientStack.getStackTrace(), e.getStackTrace()));
throw e;
}
};
}
/**
* Wrapping thread pool tasks.
*
* @param task task
* @param clientStack client stack
* @param <T> computed result
* @return wrapped runnable
*/
private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack) {
return () -> {
try {
return task.call();
} catch (Exception e) {
e.setStackTrace(ArrayUtil.addAll(clientStack.getStackTrace(), e.getStackTrace()));
throw e;
}
};
}
}

@ -17,9 +17,8 @@
package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread pool adapter.
@ -42,7 +41,7 @@ public interface DynamicThreadPoolAdapter {
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
DynamicThreadPoolExecutor unwrap(Object executor);
ThreadPoolExecutor unwrap(Object executor);
/**
* If the {@link DynamicThreadPoolAdapter#match(Object)} conditions are met,

@ -19,14 +19,13 @@ package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.support.spi.DynamicThreadPoolAdapterSPI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread pool adapter choose.
@ -61,7 +60,7 @@ public class DynamicThreadPoolAdapterChoose {
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
public static DynamicThreadPoolExecutor unwrap(Object executor) {
public static ThreadPoolExecutor unwrap(Object executor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
return dynamicThreadPoolAdapterOptional.map(each -> each.unwrap(executor)).orElse(null);
}
@ -83,15 +82,14 @@ public class DynamicThreadPoolAdapterChoose {
* Load SPI customer adapter.
*/
private static void loadCustomerAdapter() {
ServiceLoaderRegistry.register(DynamicThreadPoolAdapterSPI.class);
Collection<DynamicThreadPoolAdapterSPI> instances = ServiceLoaderRegistry.getSingletonServiceInstances(DynamicThreadPoolAdapterSPI.class);
ServiceLoaderRegistry.register(DynamicThreadPoolAdapter.class);
Collection<DynamicThreadPoolAdapter> instances = ServiceLoaderRegistry.getSingletonServiceInstances(DynamicThreadPoolAdapter.class);
if (CollectionUtil.isEmpty(instances)) {
return;
}
for (DynamicThreadPoolAdapterSPI instance : instances) {
DynamicThreadPoolAdapter adapter = instance.adapter();
if (adapter != null) {
DYNAMIC_THREAD_POOL_ADAPTERS.add(adapter);
for (DynamicThreadPoolAdapter instance : instances) {
if (instance != null) {
DYNAMIC_THREAD_POOL_ADAPTERS.add(instance);
}
}
}

@ -35,17 +35,11 @@ import java.util.concurrent.TimeUnit;
public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter {
private static final String EXECUTOR_FIELD_NAME = "threadPoolExecutor";
private static final String WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN = "waitForTasksToCompleteOnShutdown";
private static final String AWAIT_TERMINATION_MILLIS = "awaitTerminationMillis";
private static final String TASK_DECORATOR = "taskDecorator";
private static final String BEAN_NAME = "beanName";
private static final String QUEUE_CAPACITY = "queueCapacity";
private static final String MATCH_CLASS_NAME = "ThreadPoolTaskExecutor";
@Override
@ -55,7 +49,7 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter {
}
@Override
public DynamicThreadPoolExecutor unwrap(Object executor) {
public ThreadPoolExecutor unwrap(Object executor) {
Object unwrap = ReflectUtil.getFieldValue(executor, EXECUTOR_FIELD_NAME);
if (unwrap == null) {
return null;

@ -25,6 +25,7 @@ import java.lang.reflect.Field;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Zipkin thread local executor adapter.
@ -49,7 +50,7 @@ public class ZipkinExecutorAdapter implements DynamicThreadPoolAdapter {
}
@Override
public DynamicThreadPoolExecutor unwrap(Object executor) {
public ThreadPoolExecutor unwrap(Object executor) {
Object unwrap = doUnwrap(executor);
if (unwrap instanceof DynamicThreadPoolExecutor) {
return (DynamicThreadPoolExecutor) unwrap;

@ -17,8 +17,6 @@
package cn.hippo4j.core.executor.support.spi;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter;
/**
* Dynamic thread-pool adapter SPI.
*/
@ -32,5 +30,5 @@ public interface DynamicThreadPoolAdapterSPI {
/**
* Adapter.
*/
DynamicThreadPoolAdapter adapter();
// DynamicThreadPoolAdapter adapter();
}

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link AbstractBuildThreadPoolTemplate}
*
* @author dmego
*/
public class AbstractBuildThreadPoolTemplateTest {
@ -73,27 +74,6 @@ public class AbstractBuildThreadPoolTemplateTest {
}
@Test
public void testBuildFastPool() {
ThreadPoolExecutor executor = AbstractBuildThreadPoolTemplate.buildFastPool(initParam);
AtomicInteger count = new AtomicInteger(0);
executor.submit(() -> {
ThreadUtil.sleep(100L);
return count.incrementAndGet();
});
executor.submit(() -> {
ThreadUtil.sleep(100L);
count.incrementAndGet();
});
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
Assert.assertEquals(2, count.get());
}
@Test
public void testBuildDynamicPool() {
initParam.setWaitForTasksToCompleteOnShutdown(true);

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link FastThreadPoolExecutor}
*/
public class FastThreadPoolExecutorTest {
private final static int corePoolSize = 1;
private final static int capacity = 1;
private final TaskQueue<Runnable> taskQueue = new TaskQueue<>(capacity);
private final FastThreadPoolExecutor fastThreadPoolExecutor = new FastThreadPoolExecutor(corePoolSize,
corePoolSize,
10,
TimeUnit.SECONDS,
taskQueue,
Thread::new,
new ThreadPoolExecutor.AbortPolicy());
{
taskQueue.setExecutor(fastThreadPoolExecutor);
}
@Test
void testSubmittedTaskCount() {
fastThreadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ignored) {
}
});
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
Assertions.assertEquals(1, fastThreadPoolExecutor.getSubmittedTaskCount());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
Assertions.assertEquals(0, fastThreadPoolExecutor.getSubmittedTaskCount());
// exception
int expected = 0;
for (int i = 0; i <= (corePoolSize + capacity); i++) {
expected++;
try {
fastThreadPoolExecutor.execute(() -> {
synchronized (fastThreadPoolExecutor) {
try {
fastThreadPoolExecutor.wait();
} catch (InterruptedException ignored) {
}
}
});
} catch (Exception e) {
expected--;
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
Assertions.assertEquals(expected, fastThreadPoolExecutor.getSubmittedTaskCount());
synchronized (fastThreadPoolExecutor) {
fastThreadPoolExecutor.notifyAll();
}
}
}
Loading…
Cancel
Save