create DynamicThreadPool in sever which undefined in server but defined in client when client start (#637)

* replace fastjson to JSONUtil

* fix issue #558

* fix issue #558

* remove threadPoolNamePrefix and set alarm params

Co-authored-by: airoger <czzx201101136>
pull/639/head
shanjianq 2 years ago committed by GitHub
parent 9570640f8c
commit 9c1cef1c3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -145,4 +145,11 @@ public enum BlockingQueueTypeEnum {
.findFirst();
return queueTypeEnum.map(each -> each.name).orElse("");
}
public static BlockingQueueTypeEnum getBlockingQueueTypeEnumByName(String name) {
Optional<BlockingQueueTypeEnum> queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values())
.filter(each -> each.name.equals(name))
.findFirst();
return queueTypeEnum.orElse(LINKED_BLOCKING_QUEUE);
}
}

@ -99,4 +99,11 @@ public enum RejectedPolicyTypeEnum {
public static String getRejectedNameByType(int type) {
return createPolicy(type).getClass().getSimpleName();
}
public static RejectedPolicyTypeEnum getRejectedPolicyTypeEnumByName(String name) {
Optional<RejectedPolicyTypeEnum> rejectedTypeEnum = Stream.of(RejectedPolicyTypeEnum.values())
.filter(each -> each.name.equals(name))
.findFirst();
return rejectedTypeEnum.orElse(ABORT_POLICY);
}
}

@ -23,6 +23,8 @@ import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
@ -43,12 +45,14 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.util.ClassUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -168,6 +172,25 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
isSubscribe = true;
}
} else {
// DynamicThreadPool configuration undefined in server
DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
.threadPoolId(threadPoolId)
.corePoolSize(executor.getCorePoolSize())
.maximumPoolSize(executor.getMaximumPoolSize())
.blockingQueueType(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()))
.capacity(executor.getQueue().remainingCapacity())
.allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut())
.keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
.isAlarm(false)
.activeAlarm(80)
.capacityAlarm(80)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName()))
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo)
.build();
GlobalThreadPoolManage.dynamicRegister(registerWrapper);
}
} catch (Exception ex) {
newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPool.getInstance(threadPoolId);

Loading…
Cancel
Save