From dd4e534b8c38d4cffd3d12b3e7990969c8951e03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=A9=AC=E5=93=A5?= Date: Thu, 11 Aug 2022 21:03:33 +0800 Subject: [PATCH] Optimize the dynamic registration thread pool (#526) * Migration code package location * Add condition to update if notification exists * hippo4j alarm notification initialization * Optimize the dynamic registration thread pool --- .../DynamicThreadPoolRegisterParameter.java | 6 ++- .../AbstractDynamicThreadPoolService.java | 4 +- .../RegisterDynamicThreadPoolTest.java | 40 ++++++++++--------- .../core/inittest/RunStateHandlerTest.java | 7 +++- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java index 5cbfb3cf..812ba82f 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java @@ -76,7 +76,7 @@ public class DynamicThreadPoolRegisterParameter { /** * Is alarm */ - private Integer isAlarm; + private Boolean isAlarm; /** * Capacity alarm @@ -104,6 +104,10 @@ public class DynamicThreadPoolRegisterParameter { */ private Long executeTimeOut; + public Integer getIsAlarm() { + return this.isAlarm ? 1 : 0; + } + public Integer getAllowCoreThreadTimeOut() { return this.allowCoreThreadTimeOut ? 1 : 0; } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java index be89d576..5f54d5a9 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java @@ -23,6 +23,7 @@ import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Abstract dynamic thread-pool service. @@ -42,7 +43,8 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP .maxPoolNum(registerParameter.getMaximumPoolSize()) .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) .threadFactory(registerParameter.getThreadNamePrefix()) - .keepAliveTime(registerParameter.getKeepAliveTime()) + .keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS) + .executeTimeOut(registerParameter.getExecuteTimeOut()) .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) .dynamicPool() .build(); diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java index d66be339..c541a6f2 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java @@ -23,36 +23,37 @@ import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNoti import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.message.enums.NotifyPlatformEnum; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; /** * Register dynamic thread-pool test. */ @Slf4j -@Component public class RegisterDynamicThreadPoolTest { - @PostConstruct - public void registerDynamicThreadPool() { - String threadPoolId = "register-dynamic-thread-pool"; - DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); - parameterInfo.setThreadPoolId(threadPoolId); - parameterInfo.setThreadNamePrefix(threadPoolId); - parameterInfo.setCorePoolSize(3); - parameterInfo.setMaximumPoolSize(14); - parameterInfo.setQueueType(9); - parameterInfo.setCapacity(110); - parameterInfo.setKeepAliveTime(110L); - parameterInfo.setRejectedType(2); - parameterInfo.setIsAlarm(0); - parameterInfo.setCapacityAlarm(90); - parameterInfo.setActiveAlarm(90); - parameterInfo.setAllowCoreThreadTimeOut(Boolean.TRUE); + public static ThreadPoolExecutor registerDynamicThreadPool(String threadPoolId) { + DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder() + .corePoolSize(3) + .maximumPoolSize(10) + .queueType(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type) + .capacity(110) + // TimeUnit.SECONDS + .keepAliveTime(100L) + // TimeUnit.MILLISECONDS + .executeTimeOut(800L) + .rejectedType(RejectedTypeEnum.ABORT_POLICY.type) + .isAlarm(true) + .capacityAlarm(80) + .activeAlarm(80) + .threadPoolId(threadPoolId) + .threadNamePrefix(threadPoolId) + .allowCoreThreadTimeOut(true) + .build(); // Core mode and server mode, you can choose one of them. DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder() .activeAlarm(80) @@ -74,5 +75,6 @@ public class RegisterDynamicThreadPoolTest { .build(); ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper); log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); + return dynamicThreadPool; } } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java index b076da90..c4791508 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java @@ -43,8 +43,8 @@ public class RunStateHandlerTest { private ThreadPoolExecutor messageProduceDynamicThreadPool; private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor( - 2, - 2, + 3, + 3, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), @@ -63,6 +63,9 @@ public class RunStateHandlerTest { // Start the dynamic thread pool to simulate running tasks runTask(messageConsumeTtlDynamicThreadPool); runTask(messageProduceDynamicThreadPool); + // Dynamically register thread pool + ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("register-dynamic-thread-pool"); + runTask(registerDynamicThreadPool); } private void runTask(Executor executor) {