From 9c1cef1c3b3ea54a4bf13d39dec54fe1896a719b Mon Sep 17 00:00:00 2001 From: shanjianq <49084314+shanjianq@users.noreply.github.com> Date: Sat, 3 Sep 2022 11:50:50 +0800 Subject: [PATCH] create DynamicThreadPool in sever which undefined in server but defined in client when client start (#637) * replace fastjson to JSONUtil * fix issue #558 * fix issue #558 * remove threadPoolNamePrefix and set alarm params Co-authored-by: airoger --- .../support/BlockingQueueTypeEnum.java | 7 ++++++ .../support/RejectedPolicyTypeEnum.java | 7 ++++++ .../DynamicThreadPoolPostProcessor.java | 23 +++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java index a2c29693..1d675fa5 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java @@ -145,4 +145,11 @@ public enum BlockingQueueTypeEnum { .findFirst(); return queueTypeEnum.map(each -> each.name).orElse(""); } + + public static BlockingQueueTypeEnum getBlockingQueueTypeEnumByName(String name) { + Optional queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values()) + .filter(each -> each.name.equals(name)) + .findFirst(); + return queueTypeEnum.orElse(LINKED_BLOCKING_QUEUE); + } } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/RejectedPolicyTypeEnum.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/RejectedPolicyTypeEnum.java index 597751fd..1a158816 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/RejectedPolicyTypeEnum.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/RejectedPolicyTypeEnum.java @@ -99,4 +99,11 @@ public enum RejectedPolicyTypeEnum { public static String getRejectedNameByType(int type) { return createPolicy(type).getClass().getSimpleName(); } + + public static RejectedPolicyTypeEnum getRejectedPolicyTypeEnumByName(String name) { + Optional rejectedTypeEnum = Stream.of(RejectedPolicyTypeEnum.values()) + .filter(each -> each.name.equals(name)) + .findFirst(); + return rejectedTypeEnum.orElse(ABORT_POLICY); + } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 8c7e73e1..fe80b71b 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -23,6 +23,8 @@ import cn.hippo4j.common.enums.EnableEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; import cn.hippo4j.common.model.ThreadPoolParameterInfo; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.toolkit.BooleanUtil; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.web.base.Result; @@ -43,12 +45,14 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.task.TaskDecorator; +import org.springframework.util.ClassUtils; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -168,6 +172,25 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor); isSubscribe = true; } + } else { + // DynamicThreadPool configuration undefined in server + DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder() + .threadPoolId(threadPoolId) + .corePoolSize(executor.getCorePoolSize()) + .maximumPoolSize(executor.getMaximumPoolSize()) + .blockingQueueType(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName())) + .capacity(executor.getQueue().remainingCapacity()) + .allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut()) + .keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS)) + .isAlarm(false) + .activeAlarm(80) + .capacityAlarm(80) + .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName())) + .build(); + DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() + .dynamicThreadPoolRegisterParameter(parameterInfo) + .build(); + GlobalThreadPoolManage.dynamicRegister(registerWrapper); } } catch (Exception ex) { newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPool.getInstance(threadPoolId);