diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java index 8a6a5ef8..f113d602 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java @@ -17,6 +17,7 @@ package cn.hippo4j.common.model; +import com.fasterxml.jackson.annotation.JsonAlias; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -50,6 +51,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl /** * Thread-pool id */ + @JsonAlias("threadPoolId") private String tpId; /** @@ -112,6 +114,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl /** * Liveness alarm */ + @JsonAlias("activeAlarm") private Integer livenessAlarm; /** diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java index 85d8bd9b..5cbfb3cf 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java @@ -17,6 +17,7 @@ package cn.hippo4j.common.model.register; +import com.fasterxml.jackson.annotation.JsonAlias; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -65,7 +66,7 @@ public class DynamicThreadPoolRegisterParameter { /** * Keep alive time */ - private Integer keepAliveTime; + private Long keepAliveTime; /** * Rejected type @@ -83,12 +84,27 @@ public class DynamicThreadPoolRegisterParameter { private Integer capacityAlarm; /** - * Liveness alarm + * Active alarm */ - private Integer livenessAlarm; + @JsonAlias("livenessAlarm") + private Integer activeAlarm; /** * Allow core thread timeout */ - private Integer allowCoreThreadTimeOut; + private Boolean allowCoreThreadTimeOut; + + /** + * Thread name prefix + */ + private String threadNamePrefix; + + /** + * Execute timeout + */ + private Long executeTimeOut; + + public Integer getAllowCoreThreadTimeOut() { + return this.allowCoreThreadTimeOut ? 1 : 0; + } } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java index cdbd455f..421302aa 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java @@ -17,6 +17,8 @@ package cn.hippo4j.common.model.register; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -50,4 +52,14 @@ public class DynamicThreadPoolRegisterWrapper { * Dynamic thread-pool register parameter */ private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter; + + /** + * Dynamic thread-pool core notify parameter + */ + private DynamicThreadPoolRegisterCoreNotifyParameter dynamicThreadPoolRegisterCoreNotifyParameter; + + /** + * Dynamic thread-pool server notify parameter + */ + private DynamicThreadPoolRegisterServerNotifyParameter dynamicThreadPoolRegisterServerNotifyParameter; } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterCoreNotifyParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterCoreNotifyParameter.java new file mode 100644 index 00000000..ca595243 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterCoreNotifyParameter.java @@ -0,0 +1,41 @@ +package cn.hippo4j.common.model.register.notify; + +import lombok.*; + +/** + * Dynamic thread-pool register core notify parameter. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DynamicThreadPoolRegisterCoreNotifyParameter { + + /** + * Whether to enable thread pool running alarm + */ + @NonNull + private Boolean alarm; + + /** + * Active alarm + */ + @NonNull + private Integer activeAlarm; + + /** + * Capacity alarm + */ + @NonNull + private Integer capacityAlarm; + + /** + * Interval + */ + private Integer interval; + + /** + * Receive + */ + private String receives; +} diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterServerNotifyParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterServerNotifyParameter.java new file mode 100644 index 00000000..ce6f72d5 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterServerNotifyParameter.java @@ -0,0 +1,46 @@ +package cn.hippo4j.common.model.register.notify; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Dynamic thread-pool register server notify parameter. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DynamicThreadPoolRegisterServerNotifyParameter { + + /** + * Thread-pool id + */ + private String threadPoolId; + + /** + * Platform + */ + private String platform; + + /** + * Config type + */ + private String type; + + /** + * Secret key + */ + private String secretKey; + + /** + * Interval + */ + private Integer interval; + + /** + * Receives + */ + private String receives; +} diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java index 636b105e..02e521d8 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java @@ -139,11 +139,7 @@ public class ConfigServiceImpl implements ConfigService { @Override public void register(DynamicThreadPoolRegisterWrapper registerWrapper) { - DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); - ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class); - configAllInfo.setTenantId(registerWrapper.getTenantId()); - configAllInfo.setItemId(registerWrapper.getItemId()); - configAllInfo.setTpId(registerParameter.getThreadPoolId()); + ConfigAllInfo configAllInfo = parseConfigAllInfo(registerWrapper); TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class); ItemService itemService = ApplicationContextHolder.getBean(ItemService.class); Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist"); @@ -157,6 +153,16 @@ public class ConfigServiceImpl implements ConfigService { } } + private ConfigAllInfo parseConfigAllInfo(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class); + configAllInfo.setTenantId(registerWrapper.getTenantId()); + configAllInfo.setItemId(registerWrapper.getItemId()); + configAllInfo.setTpId(registerParameter.getThreadPoolId()); + configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut()); + return configAllInfo; + } + private void verification(String identify) { if (StringUtil.isNotBlank(identify)) { Map content = getContent(identify); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java index 17c428b1..1fa8672b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java @@ -21,7 +21,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; -import cn.hippo4j.core.executor.support.DynamicThreadPoolService; +import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService; import com.google.common.collect.Lists; import java.util.List; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java new file mode 100644 index 00000000..d624a0a3 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java @@ -0,0 +1,34 @@ +package cn.hippo4j.core.executor.support.service; + +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Abstract dynamic thread-pool service. + */ +public abstract class AbstractDynamicThreadPoolService implements DynamicThreadPoolService { + + /** + * Build dynamic thread-pool executor. + * + * @param registerParameter + * @return + */ + public ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { + ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() + .threadPoolId(registerParameter.getThreadPoolId()) + .corePoolSize(registerParameter.getCorePoolSize()) + .maxPoolNum(registerParameter.getMaximumPoolSize()) + .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) + .threadFactory(registerParameter.getThreadNamePrefix()) + .keepAliveTime(registerParameter.getKeepAliveTime()) + .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) + .dynamicPool() + .build(); + return dynamicThreadPoolExecutor; + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/DynamicThreadPoolService.java similarity index 96% rename from hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java rename to hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/DynamicThreadPoolService.java index b967a08c..dbb6fd5d 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/DynamicThreadPoolService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.core.executor.support; +package cn.hippo4j.core.executor.support.service; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; diff --git a/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/apollo/Hippo4jCoreApolloExampleApplication.java b/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/apollo/Hippo4jCoreApolloExampleApplication.java index 45fd972c..2a01491f 100644 --- a/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/apollo/Hippo4jCoreApolloExampleApplication.java +++ b/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/apollo/Hippo4jCoreApolloExampleApplication.java @@ -28,5 +28,4 @@ public class Hippo4jCoreApolloExampleApplication { public static void main(String[] args) { SpringApplication.run(Hippo4jCoreApolloExampleApplication.class, args); } - } diff --git a/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/resources/bootstrap.properties b/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/resources/bootstrap.properties index 454edce8..e4899320 100644 --- a/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/resources/bootstrap.properties +++ b/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/src/main/resources/bootstrap.properties @@ -40,9 +40,8 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy spring.dynamic.thread-pool.executors[0].keep-alive-time=1000 spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume -spring.dynamic.thread-pool.executors[0].notify.is-alarm=true +spring.dynamic.thread-pool.executors[0].notify.alarm=true spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 spring.dynamic.thread-pool.executors[0].notify.interval=8 -spring.dynamic.thread-pool.executors[0].notify.receives.WECHAT=xxx -spring.dynamic.thread-pool.executors[0].notify.receives.DING=xxx +spring.dynamic.thread-pool.executors[0].notify.receive=chen.ma diff --git a/hippo4j-example/hippo4j-core-nacos-spring-boot-starter-example/src/main/resources/bootstrap.properties b/hippo4j-example/hippo4j-core-nacos-spring-boot-starter-example/src/main/resources/bootstrap.properties index c6c6be70..9266bacc 100644 --- a/hippo4j-example/hippo4j-core-nacos-spring-boot-starter-example/src/main/resources/bootstrap.properties +++ b/hippo4j-example/hippo4j-core-nacos-spring-boot-starter-example/src/main/resources/bootstrap.properties @@ -42,10 +42,8 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy spring.dynamic.thread-pool.executors[0].keep-alive-time=6691 spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume -spring.dynamic.thread-pool.executors[0].notify.is-alarm=true +spring.dynamic.thread-pool.executors[0].notify.alarm=true spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 spring.dynamic.thread-pool.executors[0].notify.interval=8 -spring.dynamic.thread-pool.executors[0].notify.receives.WECHAT=xxx -spring.dynamic.thread-pool.executors[0].notify.receives.DING=xxx -spring.dynamic.thread-pool.executors[0].notify.receives.LARK=xxx +spring.dynamic.thread-pool.executors[0].notify.receive=chen.ma \ No newline at end of file diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java index 4f843676..15b62570 100644 --- a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java +++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java @@ -21,10 +21,6 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -/** - * @author Redick01 - * @date 2022/3/14 20:40 - */ @EnableDynamicThreadPool @SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core") public class Hippo4jCoreZookeeperExampleApplication { diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/ThreadPoolConfig.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java similarity index 66% rename from hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/ThreadPoolConfig.java rename to hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java index 14f21059..d76bb3d2 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/ThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java @@ -33,18 +33,15 @@ import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_CONSUM import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE; /** - * Thread pool config. - * - * @author chen.ma - * @date 2021/6/20 17:16 + * Dynamic thread-pool config. */ @Slf4j @Configuration -public class ThreadPoolConfig { +public class DynamicThreadPoolConfig { @Bean @DynamicThreadPool - public ThreadPoolExecutor messageConsumeDynamicThreadPool() { + public Executor messageConsumeTtlDynamicThreadPool() { String threadPoolId = MESSAGE_CONSUME; ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder() .dynamicPool() @@ -55,7 +52,9 @@ public class ThreadPoolConfig { .awaitTerminationMillis(5000L) .taskDecorator(new TaskTraceBuilderHandler()) .build(); - return customExecutor; + // Ali ttl adaptation use case. + Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor); + return ttlExecutor; } @Bean @@ -70,44 +69,10 @@ public class ThreadPoolConfig { .waitForTasksToCompleteOnShutdown(true) .awaitTerminationMillis(5000L) /** - * 上下文传递,测试用例:{@link TaskDecoratorTest} + * Context passing, test cases: {@link TaskDecoratorTest} */ .taskDecorator(new TaskDecoratorTest.ContextCopyingDecorator()) .build(); return produceExecutor; } - - @Bean - @DynamicThreadPool - public Executor messageConsumeTtlDynamicThreadPool() { - String threadPoolId = MESSAGE_CONSUME; - ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder() - .dynamicPool() - .threadFactory(threadPoolId) - .threadPoolId(threadPoolId) - .executeTimeOut(800L) - .waitForTasksToCompleteOnShutdown(true) - .awaitTerminationMillis(5000L) - .taskDecorator(new TaskTraceBuilderHandler()) - .build(); - Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor); - return ttlExecutor; - } - - @Bean - @DynamicThreadPool - public Executor messageConsumeTtlServiceDynamicThreadPool() { - String threadPoolId = MESSAGE_CONSUME; - ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder() - .dynamicPool() - .threadFactory(threadPoolId) - .threadPoolId(threadPoolId) - .executeTimeOut(800L) - .waitForTasksToCompleteOnShutdown(true) - .awaitTerminationMillis(5000L) - .taskDecorator(new TaskTraceBuilderHandler()) - .build(); - Executor ttlExecutor = TtlExecutors.getTtlExecutorService(customExecutor); - return ttlExecutor; - } } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/constant/GlobalTestConstant.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/constant/GlobalTestConstant.java index f7137460..45ed4808 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/constant/GlobalTestConstant.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/constant/GlobalTestConstant.java @@ -19,9 +19,6 @@ package cn.hippo4j.example.core.constant; /** * Global test variables. - * - * @author chen.ma - * @date 2021/8/15 21:06 */ public class GlobalTestConstant { diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/ErrorLogRejectedExecutionHandler.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/ErrorLogRejectedExecutionHandler.java index 67a63b5f..d2b17531 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/ErrorLogRejectedExecutionHandler.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/ErrorLogRejectedExecutionHandler.java @@ -25,10 +25,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** - * 自定义拒绝策略. - * - * @author chen.ma - * @date 2022/1/4 22:19 + * Custom Deny Policy. */ public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler { diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/TaskTraceBuilderHandler.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/TaskTraceBuilderHandler.java index e6f89c15..22b3a4aa 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/TaskTraceBuilderHandler.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/handler/TaskTraceBuilderHandler.java @@ -25,9 +25,6 @@ import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE; /** * Task trace builder handler. - * - * @author chen.ma - * @date 2022/3/2 20:46 */ public final class TaskTraceBuilderHandler implements TaskDecorator { @@ -39,7 +36,7 @@ public final class TaskTraceBuilderHandler implements TaskDecorator { MDC.put(EXECUTE_TIMEOUT_TRACE, executeTimeoutTrace); } runnable.run(); - // 此处不用进行清理操作, 统一在线程任务执行后清理 + // There is no need to clean up here, and it will be cleaned up after the thread task is executed. }; return taskRun; } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/AlarmSendMessageTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/AlarmSendMessageTest.java index 394b37cc..5fee289d 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/AlarmSendMessageTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/AlarmSendMessageTest.java @@ -31,17 +31,14 @@ import java.util.concurrent.TimeUnit; /** * Test alarm send message. - * - * @author chen.ma - * @date 2021/8/15 21:03 */ @Slf4j @Component public class AlarmSendMessageTest { /** - * 测试报警通知. - * 如果需要运行此单测, 方法上添加 @PostConstruct + * Test alarm notification. + * If you need to run this single test, add @PostConstruct to the method. */ @SuppressWarnings("all") public void alarmSendMessageTest() { diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java index eae3e97e..fe7bcb8a 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java @@ -19,8 +19,11 @@ package cn.hippo4j.example.core.inittest; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.message.enums.NotifyPlatformEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -39,18 +42,37 @@ public class RegisterDynamicThreadPoolTest { String threadPoolId = "register-dynamic-thread-pool"; DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); parameterInfo.setThreadPoolId(threadPoolId); + parameterInfo.setThreadNamePrefix(threadPoolId); parameterInfo.setCorePoolSize(3); parameterInfo.setMaximumPoolSize(14); parameterInfo.setQueueType(9); parameterInfo.setCapacity(110); - parameterInfo.setKeepAliveTime(110); + parameterInfo.setKeepAliveTime(110L); parameterInfo.setRejectedType(2); parameterInfo.setIsAlarm(0); parameterInfo.setCapacityAlarm(90); - parameterInfo.setLivenessAlarm(90); - parameterInfo.setAllowCoreThreadTimeOut(0); + parameterInfo.setActiveAlarm(90); + parameterInfo.setAllowCoreThreadTimeOut(Boolean.TRUE); + + // core 模式和 server 模式,各选其一即可 + DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder() + .activeAlarm(80) + .capacityAlarm(80) + .receives("chen.ma") + .alarm(true) + .interval(5) + .build(); + DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder() + .platform(NotifyPlatformEnum.WECHAT.name()) + .secretKey("xxx") + .threadPoolId(threadPoolId) + .interval(5) + .receives("chen.ma") + .build(); DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() .dynamicThreadPoolRegisterParameter(parameterInfo) + .dynamicThreadPoolRegisterCoreNotifyParameter(coreNotifyParameter) + .dynamicThreadPoolRegisterServerNotifyParameter(serverNotifyParameter) .build(); ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper); log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java index 3c48989f..b076da90 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java @@ -25,25 +25,19 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE; /** - * Test run time metrics. - * - * @author chen.ma - * @date 2021/8/15 21:00 + * Run state handler test. */ @Slf4j @Component public class RunStateHandlerTest { @Resource - private ThreadPoolExecutor messageConsumeDynamicThreadPool; + private Executor messageConsumeTtlDynamicThreadPool; @Resource private ThreadPoolExecutor messageProduceDynamicThreadPool; @@ -66,24 +60,22 @@ public class RunStateHandlerTest { @SuppressWarnings("all") public void runStateHandlerTest() { log.info("Test thread pool runtime state interface..."); - - // 启动动态线程池模拟运行任务 - runTask(messageConsumeDynamicThreadPool); - // 启动动态线程池模拟运行任务 + // Start the dynamic thread pool to simulate running tasks + runTask(messageConsumeTtlDynamicThreadPool); runTask(messageProduceDynamicThreadPool); } - private void runTask(ExecutorService executorService) { - // 模拟任务运行 + private void runTask(Executor executor) { + // Simulate task run runStateHandlerTestExecutor.execute(() -> { /** - * 当线程池任务执行超时, 向 MDC 放入 Trace 标识, 报警时打印出来. + * When the execution of the thread pool task times out, the Trace flag is put into the MDC, and it is printed out when an alarm occurs. */ MDC.put(EXECUTE_TIMEOUT_TRACE, "https://github.com/opengoofy/hippo4j 感觉不错来个 Star."); ThreadUtil.sleep(5000); for (int i = 0; i < Integer.MAX_VALUE; i++) { try { - executorService.execute(() -> { + executor.execute(() -> { try { int maxRandom = 10; int temp = 2; @@ -94,12 +86,10 @@ public class RunStateHandlerTest { } else { Thread.sleep(3000); } - } catch (InterruptedException e) { - // ignore + } catch (InterruptedException ignored) { } }); - } catch (Exception ex) { - // ignore + } catch (Exception ignored) { } ThreadUtil.sleep(500); } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java index 3de161f1..4e08a080 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java @@ -32,9 +32,6 @@ import java.util.concurrent.TimeUnit; /** * TaskDecorator test. - * - * @author chen.ma - * @date 2021/11/28 13:01 */ @Slf4j @Component @@ -57,21 +54,17 @@ public class TaskDecoratorTest { new ThreadPoolExecutor.AbortPolicy()); /** - * 测试动态线程池传递 {@link TaskDecorator} - * 如果需要运行此单测, 方法上添加 @PostConstruct + * Test dynamic thread pool passing {@link TaskDecorator} + * If you need to run this single test, add @PostConstruct to the method. */ public void taskDecoratorTest() { taskDecoratorTestExecutor.execute(() -> { - MDC.put(PLACEHOLDER, "查看官网: https://www.hippox.cn"); + MDC.put(PLACEHOLDER, "View the official website: https://www.hippo4j.cn"); ThreadUtil.sleep(5000); DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE); ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor(); threadPoolExecutor.execute(() -> { - /** - * 此处打印不为空, taskDecorator 即为生效. - * taskDecorator 配置查看 {@link ThreadPoolConfig#messageConsumeDynamicThreadPool()} - */ - log.info("通过 taskDecorator MDC 传递上下文: {}", MDC.get(PLACEHOLDER)); + log.info("Pass context via taskDecorator MDC: {}", MDC.get(PLACEHOLDER)); }); }); } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java index ca24ff83..d53d8f30 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java @@ -26,9 +26,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** - * @author : wh - * @date : 2022/5/24 10:02 - * @description: + * RabbitMQ thread-pool config. */ @Configuration public class RabbitMQThreadPoolConfig { @@ -36,11 +34,11 @@ public class RabbitMQThreadPoolConfig { @Bean public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - // 指定线程的最大数量 + // Specify the maximum number of threads. executor.setMaxPoolSize(5); - // 指定线程池维护线程的最少数量 + // Specifies the minimum number of thread pool maintenance threads. executor.setCorePoolSize(5); - // 指定等待处理的任务数 + // Specifies the number of tasks waiting to be processed. executor.setQueueCapacity(1000); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); return executor; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-es-monitor-example/src/main/resources/application.properties b/hippo4j-example/hippo4j-spring-boot-starter-es-monitor-example/src/main/resources/application.properties index 76febf01..e560d334 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-es-monitor-example/src/main/resources/application.properties +++ b/hippo4j-example/hippo4j-spring-boot-starter-es-monitor-example/src/main/resources/application.properties @@ -35,7 +35,7 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=CallerRunsPolicy spring.dynamic.thread-pool.executors[0].keep-alive-time=1024 spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].thread-name-prefix=untimely-thread-pool -spring.dynamic.thread-pool.executors[0].notify.is-alarm=true +spring.dynamic.thread-pool.executors[0].notify.alarm=true spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=20 spring.dynamic.thread-pool.executors[0].notify.interval=2 spring.dynamic.thread-pool.executors[0].notify.receive=xxx diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java b/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java index 00ae2c63..d0c2f5c9 100644 --- a/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java +++ b/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java @@ -22,8 +22,6 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import java.util.Map; - /** * Thread pool notify alarm. */ @@ -55,16 +53,8 @@ public class ThreadPoolNotifyAlarm { */ private Integer interval; - /** - * Receive - */ - private String receive; - /** * Receives - * - *

Do not enable this configuration for the time being, it may be useful if you develop mailboxes in the future. */ - @Deprecated - private Map receives; + private String receives; } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java index 93696763..12faa712 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java @@ -133,9 +133,9 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface { private Integer alarmInterval; /** - * Receive. + * Receives. */ - private String receive; + private String receives; /** * Executors. diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index ec672a43..0aa3940d 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -28,8 +28,9 @@ import cn.hippo4j.core.springboot.starter.refresher.event.AdapterExecutorsListen import cn.hippo4j.core.springboot.starter.refresher.event.ExecutorsListener; import cn.hippo4j.core.springboot.starter.refresher.event.PlatformsListener; import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener; +import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister; +import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor; -import cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister; import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.service.AlarmControlHandler; @@ -115,12 +116,17 @@ public class DynamicThreadPoolCoreAutoConfiguration { } @Bean - public ThreadPoolAdapterRegister threadPoolAdapterRegister() { - return new ThreadPoolAdapterRegister(bootstrapCoreProperties); + public DynamicThreadPoolAdapterRegister threadPoolAdapterRegister() { + return new DynamicThreadPoolAdapterRegister(bootstrapCoreProperties); } @Bean public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties); } + + @Bean + public DynamicThreadPoolConfigService dynamicThreadPoolConfigService() { + return new DynamicThreadPoolConfigService(); + } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java index dbcf4973..60d013fa 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java @@ -18,18 +18,20 @@ package cn.hippo4j.core.springboot.starter.config; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; -import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; -import java.util.Map; -import java.util.Objects; - /** * Executor properties. */ @Data +@Builder @Accessors(chain = true) +@NoArgsConstructor +@AllArgsConstructor public class ExecutorProperties { /** @@ -86,8 +88,4 @@ public class ExecutorProperties { * Notify */ private ThreadPoolNotifyAlarm notify; - - public Map receives() { - return Objects.isNull(this.notify) || this.notify.getReceives() == null ? Maps.newHashMap() : this.notify.getReceives(); - } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java index b829885e..230e8d6c 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java @@ -121,20 +121,12 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { String receive; if (executor.getNotify() != null) { - receive = executor.getNotify().getReceive(); + receive = executor.getNotify().getReceives(); if (StrUtil.isBlank(receive)) { - receive = bootstrapCoreProperties.getReceive(); - if (StrUtil.isBlank(receive)) { - Map receives = executor.receives(); - receive = receives.get(platformProperties.getPlatform()); - } + receive = bootstrapCoreProperties.getReceives(); } } else { - receive = bootstrapCoreProperties.getReceive(); - if (StrUtil.isBlank(receive)) { - Map receives = executor.receives(); - receive = receives.get(platformProperties.getPlatform()); - } + receive = bootstrapCoreProperties.getReceives(); } return receive; } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java index f8bc5311..ea655781 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java @@ -22,7 +22,6 @@ import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.NotifyPlatformProperties; -import cn.hippo4j.message.enums.NotifyPlatformEnum; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; @@ -82,8 +81,8 @@ public class BootstrapCorePropertiesBinderAdapt { boolean containFlag = key != null && StringUtil.isNotBlank((String) key) && (((String) key).indexOf(PREFIX + ".executors") != -1 - || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 - || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); + || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 + || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); if (containFlag) { String targetKey = key.toString().replace(PREFIX + ".", ""); targetMap.put(targetKey, val); @@ -138,14 +137,7 @@ public class BootstrapCorePropertiesBinderAdapt { if (executorProperties != null) { if (CollectionUtil.isNotEmpty(notifySingleMap)) { ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create()); - Map notifyReceivesMap = Maps.newHashMap(); - for (NotifyPlatformEnum value : NotifyPlatformEnum.values()) { - Object receives = targetMap.get("executors[" + i + "].notify.receives." + value.name()); - if (receives != null && StringUtil.isNotBlank((String) receives)) { - notifyReceivesMap.put(value.name(), (String) receives); - } - } - alarm.setReceives(notifyReceivesMap); + alarm.setReceives(alarm.getReceives()); executorProperties.setNotify(alarm); } executorPropertiesList.add(executorProperties); diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java index 1f7cd9ab..25f875e2 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java @@ -114,7 +114,7 @@ public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefr executorProperties.getNotify().getCapacityAlarm(), executorProperties.getNotify().getActiveAlarm()); threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval()); - threadPoolNotifyAlarm.setReceives(executorProperties.receives()); + threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives()); GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm); }); } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java index 2bbcb0f1..217f1288 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java @@ -33,7 +33,7 @@ import java.util.Objects; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jCoreDynamicRefreshEventOrder.ADAPTER_EXECUTORS_LISTENER; -import static cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP; +import static cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP; /** * Adapter executors listener. diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolAdapterRegister.java similarity index 94% rename from hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/ThreadPoolAdapterRegister.java rename to hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolAdapterRegister.java index f9204fc8..dad7a70f 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolAdapterRegister.java @@ -32,11 +32,11 @@ import java.util.Map; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; /** - * Thread-pool adapter register. + * Dynamic thread-pool adapter register. */ @Slf4j @AllArgsConstructor -public class ThreadPoolAdapterRegister implements ApplicationRunner { +public class DynamicThreadPoolAdapterRegister implements ApplicationRunner { private final BootstrapCoreProperties bootstrapCoreProperties; diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolConfigService.java new file mode 100644 index 00000000..84fce801 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolConfigService.java @@ -0,0 +1,60 @@ +package cn.hippo4j.core.springboot.starter.support; + +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter; +import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; +import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; +import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; +import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; +import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Dynamic thread-pool config service. + */ +public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService { + + @Override + public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + String threadPoolId = registerParameter.getThreadPoolId(); + ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter); + DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() + .threadPoolId(threadPoolId) + .executor(dynamicThreadPoolExecutor) + .build(); + // Register pool. + GlobalThreadPoolManage.registerPool(threadPoolId, dynamicThreadPoolWrapper); + ExecutorProperties executorProperties = buildExecutorProperties(registerWrapper); + // Register properties. + GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties); + DynamicThreadPoolRegisterCoreNotifyParameter notifyParameter = registerWrapper.getDynamicThreadPoolRegisterCoreNotifyParameter(); + ThreadPoolNotifyAlarm notifyAlarm = new ThreadPoolNotifyAlarm(true, notifyParameter.getActiveAlarm(), notifyParameter.getCapacityAlarm()); + notifyAlarm.setReceives(notifyParameter.getReceives()); + notifyAlarm.setInterval(notifyParameter.getInterval()); + // Register notify. + GlobalNotifyAlarmManage.put(threadPoolId, notifyAlarm); + return dynamicThreadPoolExecutor; + } + + private ExecutorProperties buildExecutorProperties(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + ExecutorProperties executorProperties = ExecutorProperties.builder() + .corePoolSize(registerParameter.getCorePoolSize()) + .maximumPoolSize(registerParameter.getMaximumPoolSize()) + .allowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut()) + .keepAliveTime(registerParameter.getKeepAliveTime()) + .blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType())) + .threadNamePrefix(registerParameter.getThreadNamePrefix()) + .rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType())) + .executeTimeOut(registerParameter.getExecuteTimeOut()) + .threadPoolId(registerParameter.getThreadPoolId()) + .build(); + return executorProperties; + } +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 5a44a5ae..ee9c3be4 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -147,10 +147,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { int interval = Optional.ofNullable(notify) .map(each -> each.getInterval()).orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5); String receive = Optional.ofNullable(notify) - .map(each -> each.getReceive()).orElseGet(() -> bootstrapCoreProperties.getReceive() != null ? bootstrapCoreProperties.getReceive() : null); + .map(each -> each.getReceives()).orElseGet(() -> bootstrapCoreProperties.getReceives() != null ? bootstrapCoreProperties.getReceives() : null); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); threadPoolNotifyAlarm.setInterval(interval); - threadPoolNotifyAlarm.setReceive(receive); + threadPoolNotifyAlarm.setReceives(receive); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator(); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 24f40176..6d750d47 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -26,7 +26,7 @@ import cn.hippo4j.core.config.UtilAutoConfiguration; import cn.hippo4j.core.enable.MarkerConfiguration; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; -import cn.hippo4j.core.executor.support.DynamicThreadPoolService; +import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.inet.InetUtils; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java index c000520a..83b96ace 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java @@ -26,10 +26,7 @@ import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.DynamicThreadPoolService; -import cn.hippo4j.core.executor.support.QueueTypeEnum; -import cn.hippo4j.core.executor.support.RejectedTypeEnum; -import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.remote.HttpAgent; @@ -46,7 +43,7 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_ */ @Slf4j @RequiredArgsConstructor -public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener { +public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener { private final HttpAgent httpAgent; @@ -56,11 +53,6 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; - @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { - clientWorker.notifyApplicationComplete(); - } - @Override public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); @@ -88,18 +80,9 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, return dynamicThreadPoolExecutor; } - private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { - ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() - .threadPoolId(registerParameter.getThreadPoolId()) - .corePoolSize(registerParameter.getCorePoolSize()) - .maxPoolNum(registerParameter.getMaximumPoolSize()) - .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) - .threadFactory(registerParameter.getThreadPoolId()) - .keepAliveTime(registerParameter.getKeepAliveTime()) - .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) - .dynamicPool() - .build(); - return dynamicThreadPoolExecutor; + @Override + public void onApplicationEvent(ApplicationCompleteEvent event) { + clientWorker.notifyApplicationComplete(); } private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {