Merge pull request #512 from mabaiwan/develop

Improve the dynamic registration thread pool logic
pull/515/head
小马哥 2 years ago committed by GitHub
commit f6eaecca75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,7 @@
package cn.hippo4j.common.model; package cn.hippo4j.common.model;
import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -50,6 +51,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
/** /**
* Thread-pool id * Thread-pool id
*/ */
@JsonAlias("threadPoolId")
private String tpId; private String tpId;
/** /**
@ -112,6 +114,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
/** /**
* Liveness alarm * Liveness alarm
*/ */
@JsonAlias("activeAlarm")
private Integer livenessAlarm; private Integer livenessAlarm;
/** /**

@ -17,6 +17,7 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -65,7 +66,7 @@ public class DynamicThreadPoolRegisterParameter {
/** /**
* Keep alive time * Keep alive time
*/ */
private Integer keepAliveTime; private Long keepAliveTime;
/** /**
* Rejected type * Rejected type
@ -83,12 +84,27 @@ public class DynamicThreadPoolRegisterParameter {
private Integer capacityAlarm; private Integer capacityAlarm;
/** /**
* Liveness alarm * Active alarm
*/ */
private Integer livenessAlarm; @JsonAlias("livenessAlarm")
private Integer activeAlarm;
/** /**
* Allow core thread timeout * Allow core thread timeout
*/ */
private Integer allowCoreThreadTimeOut; private Boolean allowCoreThreadTimeOut;
/**
* Thread name prefix
*/
private String threadNamePrefix;
/**
* Execute timeout
*/
private Long executeTimeOut;
public Integer getAllowCoreThreadTimeOut() {
return this.allowCoreThreadTimeOut ? 1 : 0;
}
} }

@ -17,6 +17,8 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -50,4 +52,14 @@ public class DynamicThreadPoolRegisterWrapper {
* Dynamic thread-pool register parameter * Dynamic thread-pool register parameter
*/ */
private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter; private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter;
/**
* Dynamic thread-pool core notify parameter
*/
private DynamicThreadPoolRegisterCoreNotifyParameter dynamicThreadPoolRegisterCoreNotifyParameter;
/**
* Dynamic thread-pool server notify parameter
*/
private DynamicThreadPoolRegisterServerNotifyParameter dynamicThreadPoolRegisterServerNotifyParameter;
} }

@ -0,0 +1,41 @@
package cn.hippo4j.common.model.register.notify;
import lombok.*;
/**
* Dynamic thread-pool register core notify parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterCoreNotifyParameter {
/**
* Whether to enable thread pool running alarm
*/
@NonNull
private Boolean alarm;
/**
* Active alarm
*/
@NonNull
private Integer activeAlarm;
/**
* Capacity alarm
*/
@NonNull
private Integer capacityAlarm;
/**
* Interval
*/
private Integer interval;
/**
* Receive
*/
private String receives;
}

@ -0,0 +1,46 @@
package cn.hippo4j.common.model.register.notify;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Dynamic thread-pool register server notify parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterServerNotifyParameter {
/**
* Thread-pool id
*/
private String threadPoolId;
/**
* Platform
*/
private String platform;
/**
* Config type
*/
private String type;
/**
* Secret key
*/
private String secretKey;
/**
* Interval
*/
private Integer interval;
/**
* Receives
*/
private String receives;
}

@ -139,11 +139,7 @@ public class ConfigServiceImpl implements ConfigService {
@Override @Override
public void register(DynamicThreadPoolRegisterWrapper registerWrapper) { public void register(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); ConfigAllInfo configAllInfo = parseConfigAllInfo(registerWrapper);
ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class);
configAllInfo.setTenantId(registerWrapper.getTenantId());
configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId());
TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class); TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class);
ItemService itemService = ApplicationContextHolder.getBean(ItemService.class); ItemService itemService = ApplicationContextHolder.getBean(ItemService.class);
Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist"); Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist");
@ -157,6 +153,16 @@ public class ConfigServiceImpl implements ConfigService {
} }
} }
private ConfigAllInfo parseConfigAllInfo(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class);
configAllInfo.setTenantId(registerWrapper.getTenantId());
configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId());
configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut());
return configAllInfo;
}
private void verification(String identify) { private void verification(String identify) {
if (StringUtil.isNotBlank(identify)) { if (StringUtil.isNotBlank(identify)) {
Map content = getContent(identify); Map content = getContent(identify);

@ -21,7 +21,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.List; import java.util.List;

@ -0,0 +1,34 @@
package cn.hippo4j.core.executor.support.service;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Abstract dynamic thread-pool service.
*/
public abstract class AbstractDynamicThreadPoolService implements DynamicThreadPoolService {
/**
* Build dynamic thread-pool executor.
*
* @param registerParameter
* @return
*/
public ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) {
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(registerParameter.getThreadPoolId())
.corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadNamePrefix())
.keepAliveTime(registerParameter.getKeepAliveTime())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool()
.build();
return dynamicThreadPoolExecutor;
}
}

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support.service;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;

@ -28,5 +28,4 @@ public class Hippo4jCoreApolloExampleApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(Hippo4jCoreApolloExampleApplication.class, args); SpringApplication.run(Hippo4jCoreApolloExampleApplication.class, args);
} }
} }

@ -40,9 +40,8 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=1000 spring.dynamic.thread-pool.executors[0].keep-alive-time=1000
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true spring.dynamic.thread-pool.executors[0].notify.alarm=true
spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8 spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.receives.WECHAT=xxx spring.dynamic.thread-pool.executors[0].notify.receive=chen.ma
spring.dynamic.thread-pool.executors[0].notify.receives.DING=xxx

@ -42,10 +42,8 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=6691 spring.dynamic.thread-pool.executors[0].keep-alive-time=6691
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true spring.dynamic.thread-pool.executors[0].notify.alarm=true
spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8 spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.receives.WECHAT=xxx spring.dynamic.thread-pool.executors[0].notify.receive=chen.ma
spring.dynamic.thread-pool.executors[0].notify.receives.DING=xxx
spring.dynamic.thread-pool.executors[0].notify.receives.LARK=xxx

@ -21,10 +21,6 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Redick01
* @date 2022/3/14 20:40
*/
@EnableDynamicThreadPool @EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core") @SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class Hippo4jCoreZookeeperExampleApplication { public class Hippo4jCoreZookeeperExampleApplication {

@ -33,18 +33,15 @@ import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_CONSUM
import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE; import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE;
/** /**
* Thread pool config. * Dynamic thread-pool config.
*
* @author chen.ma
* @date 2021/6/20 17:16
*/ */
@Slf4j @Slf4j
@Configuration @Configuration
public class ThreadPoolConfig { public class DynamicThreadPoolConfig {
@Bean @Bean
@DynamicThreadPool @DynamicThreadPool
public ThreadPoolExecutor messageConsumeDynamicThreadPool() { public Executor messageConsumeTtlDynamicThreadPool() {
String threadPoolId = MESSAGE_CONSUME; String threadPoolId = MESSAGE_CONSUME;
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder() ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
@ -55,7 +52,9 @@ public class ThreadPoolConfig {
.awaitTerminationMillis(5000L) .awaitTerminationMillis(5000L)
.taskDecorator(new TaskTraceBuilderHandler()) .taskDecorator(new TaskTraceBuilderHandler())
.build(); .build();
return customExecutor; // Ali ttl adaptation use case.
Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor);
return ttlExecutor;
} }
@Bean @Bean
@ -70,44 +69,10 @@ public class ThreadPoolConfig {
.waitForTasksToCompleteOnShutdown(true) .waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L) .awaitTerminationMillis(5000L)
/** /**
* {@link TaskDecoratorTest} * Context passing, test cases: {@link TaskDecoratorTest}
*/ */
.taskDecorator(new TaskDecoratorTest.ContextCopyingDecorator()) .taskDecorator(new TaskDecoratorTest.ContextCopyingDecorator())
.build(); .build();
return produceExecutor; return produceExecutor;
} }
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlDynamicThreadPool() {
String threadPoolId = MESSAGE_CONSUME;
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(800L)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor);
return ttlExecutor;
}
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlServiceDynamicThreadPool() {
String threadPoolId = MESSAGE_CONSUME;
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(800L)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
Executor ttlExecutor = TtlExecutors.getTtlExecutorService(customExecutor);
return ttlExecutor;
}
} }

@ -19,9 +19,6 @@ package cn.hippo4j.example.core.constant;
/** /**
* Global test variables. * Global test variables.
*
* @author chen.ma
* @date 2021/8/15 21:06
*/ */
public class GlobalTestConstant { public class GlobalTestConstant {

@ -25,10 +25,7 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* . * Custom Deny Policy.
*
* @author chen.ma
* @date 2022/1/4 22:19
*/ */
public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler { public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler {

@ -25,9 +25,6 @@ import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/** /**
* Task trace builder handler. * Task trace builder handler.
*
* @author chen.ma
* @date 2022/3/2 20:46
*/ */
public final class TaskTraceBuilderHandler implements TaskDecorator { public final class TaskTraceBuilderHandler implements TaskDecorator {
@ -39,7 +36,7 @@ public final class TaskTraceBuilderHandler implements TaskDecorator {
MDC.put(EXECUTE_TIMEOUT_TRACE, executeTimeoutTrace); MDC.put(EXECUTE_TIMEOUT_TRACE, executeTimeoutTrace);
} }
runnable.run(); runnable.run();
// 此处不用进行清理操作, 统一在线程任务执行后清理 // There is no need to clean up here, and it will be cleaned up after the thread task is executed.
}; };
return taskRun; return taskRun;
} }

@ -31,17 +31,14 @@ import java.util.concurrent.TimeUnit;
/** /**
* Test alarm send message. * Test alarm send message.
*
* @author chen.ma
* @date 2021/8/15 21:03
*/ */
@Slf4j @Slf4j
@Component @Component
public class AlarmSendMessageTest { public class AlarmSendMessageTest {
/** /**
* . * Test alarm notification.
* , @PostConstruct * If you need to run this single test, add @PostConstruct to the method.
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
public void alarmSendMessageTest() { public void alarmSendMessageTest() {

@ -19,8 +19,11 @@ package cn.hippo4j.example.core.inittest;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.enums.NotifyPlatformEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -39,18 +42,37 @@ public class RegisterDynamicThreadPoolTest {
String threadPoolId = "register-dynamic-thread-pool"; String threadPoolId = "register-dynamic-thread-pool";
DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter();
parameterInfo.setThreadPoolId(threadPoolId); parameterInfo.setThreadPoolId(threadPoolId);
parameterInfo.setThreadNamePrefix(threadPoolId);
parameterInfo.setCorePoolSize(3); parameterInfo.setCorePoolSize(3);
parameterInfo.setMaximumPoolSize(14); parameterInfo.setMaximumPoolSize(14);
parameterInfo.setQueueType(9); parameterInfo.setQueueType(9);
parameterInfo.setCapacity(110); parameterInfo.setCapacity(110);
parameterInfo.setKeepAliveTime(110); parameterInfo.setKeepAliveTime(110L);
parameterInfo.setRejectedType(2); parameterInfo.setRejectedType(2);
parameterInfo.setIsAlarm(0); parameterInfo.setIsAlarm(0);
parameterInfo.setCapacityAlarm(90); parameterInfo.setCapacityAlarm(90);
parameterInfo.setLivenessAlarm(90); parameterInfo.setActiveAlarm(90);
parameterInfo.setAllowCoreThreadTimeOut(0); parameterInfo.setAllowCoreThreadTimeOut(Boolean.TRUE);
// core 模式和 server 模式,各选其一即可
DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder()
.activeAlarm(80)
.capacityAlarm(80)
.receives("chen.ma")
.alarm(true)
.interval(5)
.build();
DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder()
.platform(NotifyPlatformEnum.WECHAT.name())
.secretKey("xxx")
.threadPoolId(threadPoolId)
.interval(5)
.receives("chen.ma")
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo) .dynamicThreadPoolRegisterParameter(parameterInfo)
.dynamicThreadPoolRegisterCoreNotifyParameter(coreNotifyParameter)
.dynamicThreadPoolRegisterServerNotifyParameter(serverNotifyParameter)
.build(); .build();
ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper); ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper);
log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool));

@ -25,25 +25,19 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE; import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/** /**
* Test run time metrics. * Run state handler test.
*
* @author chen.ma
* @date 2021/8/15 21:00
*/ */
@Slf4j @Slf4j
@Component @Component
public class RunStateHandlerTest { public class RunStateHandlerTest {
@Resource @Resource
private ThreadPoolExecutor messageConsumeDynamicThreadPool; private Executor messageConsumeTtlDynamicThreadPool;
@Resource @Resource
private ThreadPoolExecutor messageProduceDynamicThreadPool; private ThreadPoolExecutor messageProduceDynamicThreadPool;
@ -66,24 +60,22 @@ public class RunStateHandlerTest {
@SuppressWarnings("all") @SuppressWarnings("all")
public void runStateHandlerTest() { public void runStateHandlerTest() {
log.info("Test thread pool runtime state interface..."); log.info("Test thread pool runtime state interface...");
// Start the dynamic thread pool to simulate running tasks
// 启动动态线程池模拟运行任务 runTask(messageConsumeTtlDynamicThreadPool);
runTask(messageConsumeDynamicThreadPool);
// 启动动态线程池模拟运行任务
runTask(messageProduceDynamicThreadPool); runTask(messageProduceDynamicThreadPool);
} }
private void runTask(ExecutorService executorService) { private void runTask(Executor executor) {
// 模拟任务运行 // Simulate task run
runStateHandlerTestExecutor.execute(() -> { runStateHandlerTestExecutor.execute(() -> {
/** /**
* 线, MDC Trace , . * When the execution of the thread pool task times out, the Trace flag is put into the MDC, and it is printed out when an alarm occurs.
*/ */
MDC.put(EXECUTE_TIMEOUT_TRACE, "https://github.com/opengoofy/hippo4j 感觉不错来个 Star."); MDC.put(EXECUTE_TIMEOUT_TRACE, "https://github.com/opengoofy/hippo4j 感觉不错来个 Star.");
ThreadUtil.sleep(5000); ThreadUtil.sleep(5000);
for (int i = 0; i < Integer.MAX_VALUE; i++) { for (int i = 0; i < Integer.MAX_VALUE; i++) {
try { try {
executorService.execute(() -> { executor.execute(() -> {
try { try {
int maxRandom = 10; int maxRandom = 10;
int temp = 2; int temp = 2;
@ -94,12 +86,10 @@ public class RunStateHandlerTest {
} else { } else {
Thread.sleep(3000); Thread.sleep(3000);
} }
} catch (InterruptedException e) { } catch (InterruptedException ignored) {
// ignore
} }
}); });
} catch (Exception ex) { } catch (Exception ignored) {
// ignore
} }
ThreadUtil.sleep(500); ThreadUtil.sleep(500);
} }

@ -32,9 +32,6 @@ import java.util.concurrent.TimeUnit;
/** /**
* TaskDecorator test. * TaskDecorator test.
*
* @author chen.ma
* @date 2021/11/28 13:01
*/ */
@Slf4j @Slf4j
@Component @Component
@ -57,21 +54,17 @@ public class TaskDecoratorTest {
new ThreadPoolExecutor.AbortPolicy()); new ThreadPoolExecutor.AbortPolicy());
/** /**
* 线 {@link TaskDecorator} * Test dynamic thread pool passing {@link TaskDecorator}
* , @PostConstruct * If you need to run this single test, add @PostConstruct to the method.
*/ */
public void taskDecoratorTest() { public void taskDecoratorTest() {
taskDecoratorTestExecutor.execute(() -> { taskDecoratorTestExecutor.execute(() -> {
MDC.put(PLACEHOLDER, "查看官网: https://www.hippox.cn"); MDC.put(PLACEHOLDER, "View the official website: https://www.hippo4j.cn");
ThreadUtil.sleep(5000); ThreadUtil.sleep(5000);
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE); DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE);
ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor(); ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor();
threadPoolExecutor.execute(() -> { threadPoolExecutor.execute(() -> {
/** log.info("Pass context via taskDecorator MDC: {}", MDC.get(PLACEHOLDER));
* , taskDecorator .
* taskDecorator {@link ThreadPoolConfig#messageConsumeDynamicThreadPool()}
*/
log.info("通过 taskDecorator MDC 传递上下文: {}", MDC.get(PLACEHOLDER));
}); });
}); });
} }

@ -26,9 +26,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/** /**
* @author : wh * RabbitMQ thread-pool config.
* @date : 2022/5/24 10:02
* @description:
*/ */
@Configuration @Configuration
public class RabbitMQThreadPoolConfig { public class RabbitMQThreadPoolConfig {
@ -36,11 +34,11 @@ public class RabbitMQThreadPoolConfig {
@Bean @Bean
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 指定线程的最大数量 // Specify the maximum number of threads.
executor.setMaxPoolSize(5); executor.setMaxPoolSize(5);
// 指定线程池维护线程的最少数量 // Specifies the minimum number of thread pool maintenance threads.
executor.setCorePoolSize(5); executor.setCorePoolSize(5);
// 指定等待处理的任务数 // Specifies the number of tasks waiting to be processed.
executor.setQueueCapacity(1000); executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
return executor; return executor;

@ -35,7 +35,7 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=CallerRunsPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=1024 spring.dynamic.thread-pool.executors[0].keep-alive-time=1024
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=untimely-thread-pool spring.dynamic.thread-pool.executors[0].thread-name-prefix=untimely-thread-pool
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true spring.dynamic.thread-pool.executors[0].notify.alarm=true
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=20 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=20
spring.dynamic.thread-pool.executors[0].notify.interval=2 spring.dynamic.thread-pool.executors[0].notify.interval=2
spring.dynamic.thread-pool.executors[0].notify.receive=xxx spring.dynamic.thread-pool.executors[0].notify.receive=xxx

@ -22,8 +22,6 @@ import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.util.Map;
/** /**
* Thread pool notify alarm. * Thread pool notify alarm.
*/ */
@ -55,16 +53,8 @@ public class ThreadPoolNotifyAlarm {
*/ */
private Integer interval; private Integer interval;
/**
* Receive
*/
private String receive;
/** /**
* Receives * Receives
*
* <p> Do not enable this configuration for the time being, it may be useful if you develop mailboxes in the future.
*/ */
@Deprecated private String receives;
private Map<String, String> receives;
} }

@ -133,9 +133,9 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
private Integer alarmInterval; private Integer alarmInterval;
/** /**
* Receive. * Receives.
*/ */
private String receive; private String receives;
/** /**
* Executors. * Executors.

@ -28,8 +28,9 @@ import cn.hippo4j.core.springboot.starter.refresher.event.AdapterExecutorsListen
import cn.hippo4j.core.springboot.starter.refresher.event.ExecutorsListener; import cn.hippo4j.core.springboot.starter.refresher.event.ExecutorsListener;
import cn.hippo4j.core.springboot.starter.refresher.event.PlatformsListener; import cn.hippo4j.core.springboot.starter.refresher.event.PlatformsListener;
import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener; import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister;
import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.config.MessageConfiguration;
import cn.hippo4j.message.service.AlarmControlHandler; import cn.hippo4j.message.service.AlarmControlHandler;
@ -115,12 +116,17 @@ public class DynamicThreadPoolCoreAutoConfiguration {
} }
@Bean @Bean
public ThreadPoolAdapterRegister threadPoolAdapterRegister() { public DynamicThreadPoolAdapterRegister threadPoolAdapterRegister() {
return new ThreadPoolAdapterRegister(bootstrapCoreProperties); return new DynamicThreadPoolAdapterRegister(bootstrapCoreProperties);
} }
@Bean @Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { public DynamicThreadPoolBannerHandler threadPoolBannerHandler() {
return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties); return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties);
} }
@Bean
public DynamicThreadPoolConfigService dynamicThreadPoolConfigService() {
return new DynamicThreadPoolConfigService();
}
} }

@ -18,18 +18,20 @@
package cn.hippo4j.core.springboot.starter.config; package cn.hippo4j.core.springboot.starter.config;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import com.google.common.collect.Maps; import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import java.util.Map;
import java.util.Objects;
/** /**
* Executor properties. * Executor properties.
*/ */
@Data @Data
@Builder
@Accessors(chain = true) @Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class ExecutorProperties { public class ExecutorProperties {
/** /**
@ -86,8 +88,4 @@ public class ExecutorProperties {
* Notify * Notify
*/ */
private ThreadPoolNotifyAlarm notify; private ThreadPoolNotifyAlarm notify;
public Map<String, String> receives() {
return Objects.isNull(this.notify) || this.notify.getReceives() == null ? Maps.newHashMap() : this.notify.getReceives();
}
} }

@ -121,20 +121,12 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) {
String receive; String receive;
if (executor.getNotify() != null) { if (executor.getNotify() != null) {
receive = executor.getNotify().getReceive(); receive = executor.getNotify().getReceives();
if (StrUtil.isBlank(receive)) { if (StrUtil.isBlank(receive)) {
receive = bootstrapCoreProperties.getReceive(); receive = bootstrapCoreProperties.getReceives();
if (StrUtil.isBlank(receive)) {
Map<String, String> receives = executor.receives();
receive = receives.get(platformProperties.getPlatform());
}
} }
} else { } else {
receive = bootstrapCoreProperties.getReceive(); receive = bootstrapCoreProperties.getReceives();
if (StrUtil.isBlank(receive)) {
Map<String, String> receives = executor.receives();
receive = receives.get(platformProperties.getPlatform());
}
} }
return receive; return receive;
} }

@ -22,7 +22,6 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.config.NotifyPlatformProperties; import cn.hippo4j.core.springboot.starter.config.NotifyPlatformProperties;
import cn.hippo4j.message.enums.NotifyPlatformEnum;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.bean.copier.CopyOptions;
@ -82,8 +81,8 @@ public class BootstrapCorePropertiesBinderAdapt {
boolean containFlag = key != null boolean containFlag = key != null
&& StringUtil.isNotBlank((String) key) && StringUtil.isNotBlank((String) key)
&& (((String) key).indexOf(PREFIX + ".executors") != -1 && (((String) key).indexOf(PREFIX + ".executors") != -1
|| ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1
|| ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1);
if (containFlag) { if (containFlag) {
String targetKey = key.toString().replace(PREFIX + ".", ""); String targetKey = key.toString().replace(PREFIX + ".", "");
targetMap.put(targetKey, val); targetMap.put(targetKey, val);
@ -138,14 +137,7 @@ public class BootstrapCorePropertiesBinderAdapt {
if (executorProperties != null) { if (executorProperties != null) {
if (CollectionUtil.isNotEmpty(notifySingleMap)) { if (CollectionUtil.isNotEmpty(notifySingleMap)) {
ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create()); ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create());
Map<String, String> notifyReceivesMap = Maps.newHashMap(); alarm.setReceives(alarm.getReceives());
for (NotifyPlatformEnum value : NotifyPlatformEnum.values()) {
Object receives = targetMap.get("executors[" + i + "].notify.receives." + value.name());
if (receives != null && StringUtil.isNotBlank((String) receives)) {
notifyReceivesMap.put(value.name(), (String) receives);
}
}
alarm.setReceives(notifyReceivesMap);
executorProperties.setNotify(alarm); executorProperties.setNotify(alarm);
} }
executorPropertiesList.add(executorProperties); executorPropertiesList.add(executorProperties);

@ -114,7 +114,7 @@ public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefr
executorProperties.getNotify().getCapacityAlarm(), executorProperties.getNotify().getCapacityAlarm(),
executorProperties.getNotify().getActiveAlarm()); executorProperties.getNotify().getActiveAlarm());
threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval()); threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval());
threadPoolNotifyAlarm.setReceives(executorProperties.receives()); threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives());
GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm);
}); });
} }

@ -33,7 +33,7 @@ import java.util.Objects;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jCoreDynamicRefreshEventOrder.ADAPTER_EXECUTORS_LISTENER; import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jCoreDynamicRefreshEventOrder.ADAPTER_EXECUTORS_LISTENER;
import static cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP; import static cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP;
/** /**
* Adapter executors listener. * Adapter executors listener.

@ -32,11 +32,11 @@ import java.util.Map;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
/** /**
* Thread-pool adapter register. * Dynamic thread-pool adapter register.
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
public class ThreadPoolAdapterRegister implements ApplicationRunner { public class DynamicThreadPoolAdapterRegister implements ApplicationRunner {
private final BootstrapCoreProperties bootstrapCoreProperties; private final BootstrapCoreProperties bootstrapCoreProperties;

@ -0,0 +1,60 @@
package cn.hippo4j.core.springboot.starter.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread-pool config service.
*/
public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService {
@Override
public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
String threadPoolId = registerParameter.getThreadPoolId();
ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId)
.executor(dynamicThreadPoolExecutor)
.build();
// Register pool.
GlobalThreadPoolManage.registerPool(threadPoolId, dynamicThreadPoolWrapper);
ExecutorProperties executorProperties = buildExecutorProperties(registerWrapper);
// Register properties.
GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties);
DynamicThreadPoolRegisterCoreNotifyParameter notifyParameter = registerWrapper.getDynamicThreadPoolRegisterCoreNotifyParameter();
ThreadPoolNotifyAlarm notifyAlarm = new ThreadPoolNotifyAlarm(true, notifyParameter.getActiveAlarm(), notifyParameter.getCapacityAlarm());
notifyAlarm.setReceives(notifyParameter.getReceives());
notifyAlarm.setInterval(notifyParameter.getInterval());
// Register notify.
GlobalNotifyAlarmManage.put(threadPoolId, notifyAlarm);
return dynamicThreadPoolExecutor;
}
private ExecutorProperties buildExecutorProperties(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ExecutorProperties executorProperties = ExecutorProperties.builder()
.corePoolSize(registerParameter.getCorePoolSize())
.maximumPoolSize(registerParameter.getMaximumPoolSize())
.allowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut())
.keepAliveTime(registerParameter.getKeepAliveTime())
.blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType()))
.threadNamePrefix(registerParameter.getThreadNamePrefix())
.rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType()))
.executeTimeOut(registerParameter.getExecuteTimeOut())
.threadPoolId(registerParameter.getThreadPoolId())
.build();
return executorProperties;
}
}

@ -147,10 +147,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
int interval = Optional.ofNullable(notify) int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval()).orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5); .map(each -> each.getInterval()).orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5);
String receive = Optional.ofNullable(notify) String receive = Optional.ofNullable(notify)
.map(each -> each.getReceive()).orElseGet(() -> bootstrapCoreProperties.getReceive() != null ? bootstrapCoreProperties.getReceive() : null); .map(each -> each.getReceives()).orElseGet(() -> bootstrapCoreProperties.getReceives() != null ? bootstrapCoreProperties.getReceives() : null);
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval); threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceive(receive); threadPoolNotifyAlarm.setReceives(receive);
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator(); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);

@ -26,7 +26,7 @@ import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration; import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;

@ -26,10 +26,7 @@ import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -46,7 +43,7 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener<ApplicationCompleteEvent> { public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener<ApplicationCompleteEvent> {
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
@ -56,11 +53,6 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService,
private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
@Override @Override
public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
@ -88,18 +80,9 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService,
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;
} }
private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { @Override
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() public void onApplicationEvent(ApplicationCompleteEvent event) {
.threadPoolId(registerParameter.getThreadPoolId()) clientWorker.notifyApplicationComplete();
.corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadPoolId())
.keepAliveTime(registerParameter.getKeepAliveTime())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool()
.build();
return dynamicThreadPoolExecutor;
} }
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) { private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {

Loading…
Cancel
Save