Code naming and access refactoring

1.4.0
chen.ma 3 years ago
parent 44e1aa73b1
commit cf6bba0840

@ -18,6 +18,7 @@
package cn.hippo4j.common.executor.support; package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import lombok.Getter;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -27,9 +28,9 @@ import java.util.concurrent.*;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Queue type enum. * Blocking queue type enum.
*/ */
public enum QueueTypeEnum { public enum BlockingQueueTypeEnum {
/** /**
* {@link java.util.concurrent.ArrayBlockingQueue} * {@link java.util.concurrent.ArrayBlockingQueue}
@ -66,11 +67,13 @@ public enum QueueTypeEnum {
*/ */
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue"); RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue");
public Integer type; @Getter
private Integer type;
public String name; @Getter
private String name;
QueueTypeEnum(int type, String name) { BlockingQueueTypeEnum(int type, String name) {
this.type = type; this.type = type;
this.name = name; this.name = name;
} }
@ -81,7 +84,7 @@ public enum QueueTypeEnum {
public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) { public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) {
BlockingQueue blockingQueue = null; BlockingQueue blockingQueue = null;
QueueTypeEnum queueTypeEnum = Stream.of(QueueTypeEnum.values()) BlockingQueueTypeEnum queueTypeEnum = Stream.of(BlockingQueueTypeEnum.values())
.filter(each -> Objects.equals(each.name, blockingQueueName)) .filter(each -> Objects.equals(each.name, blockingQueueName))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
@ -137,7 +140,7 @@ public enum QueueTypeEnum {
} }
public static String getBlockingQueueNameByType(int type) { public static String getBlockingQueueNameByType(int type) {
Optional<QueueTypeEnum> queueTypeEnum = Arrays.stream(QueueTypeEnum.values()) Optional<BlockingQueueTypeEnum> queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values())
.filter(each -> each.type == type) .filter(each -> each.type == type)
.findFirst(); .findFirst();
return queueTypeEnum.map(each -> each.name).orElse(""); return queueTypeEnum.map(each -> each.name).orElse("");

@ -18,6 +18,7 @@
package cn.hippo4j.common.executor.support; package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import lombok.Getter;
import java.util.Collection; import java.util.Collection;
import java.util.Objects; import java.util.Objects;
@ -27,9 +28,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Reject policy type Enum. * Rejected policy type Enum.
*/ */
public enum RejectedTypeEnum { public enum RejectedPolicyTypeEnum {
CALLER_RUNS_POLICY(1, "CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy()), CALLER_RUNS_POLICY(1, "CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy()),
@ -43,13 +44,15 @@ public enum RejectedTypeEnum {
SYNC_PUT_QUEUE_POLICY(6, "SyncPutQueuePolicy", new SyncPutQueuePolicy()); SYNC_PUT_QUEUE_POLICY(6, "SyncPutQueuePolicy", new SyncPutQueuePolicy());
public Integer type; @Getter
private Integer type;
public String name; @Getter
private String name;
public RejectedExecutionHandler rejectedHandler; public RejectedExecutionHandler rejectedHandler;
RejectedTypeEnum(Integer type, String name, RejectedExecutionHandler rejectedHandler) { RejectedPolicyTypeEnum(Integer type, String name, RejectedExecutionHandler rejectedHandler) {
this.type = type; this.type = type;
this.name = name; this.name = name;
this.rejectedHandler = rejectedHandler; this.rejectedHandler = rejectedHandler;
@ -60,7 +63,7 @@ public enum RejectedTypeEnum {
} }
public static RejectedExecutionHandler createPolicy(String name) { public static RejectedExecutionHandler createPolicy(String name) {
RejectedTypeEnum rejectedTypeEnum = Stream.of(RejectedTypeEnum.values()) RejectedPolicyTypeEnum rejectedTypeEnum = Stream.of(RejectedPolicyTypeEnum.values())
.filter(each -> Objects.equals(each.name, name)) .filter(each -> Objects.equals(each.name, name))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
@ -77,7 +80,7 @@ public enum RejectedTypeEnum {
} }
public static RejectedExecutionHandler createPolicy(int type) { public static RejectedExecutionHandler createPolicy(int type) {
Optional<RejectedExecutionHandler> rejectedTypeEnum = Stream.of(RejectedTypeEnum.values()) Optional<RejectedExecutionHandler> rejectedTypeEnum = Stream.of(RejectedPolicyTypeEnum.values())
.filter(each -> Objects.equals(type, each.type)) .filter(each -> Objects.equals(type, each.type))
.map(each -> each.rejectedHandler) .map(each -> each.rejectedHandler)
.findFirst(); .findFirst();

@ -17,8 +17,8 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
@ -56,9 +56,9 @@ public class DynamicThreadPoolRegisterParameter {
private Integer maximumPoolSize; private Integer maximumPoolSize;
/** /**
* Queue type * Blocking queue type
*/ */
private QueueTypeEnum queueType; private BlockingQueueTypeEnum blockingQueueType;
/** /**
* Capacity * Capacity
@ -71,9 +71,9 @@ public class DynamicThreadPoolRegisterParameter {
private Long keepAliveTime; private Long keepAliveTime;
/** /**
* Rejected type * Rejected policy type
*/ */
private RejectedTypeEnum rejectedType; private RejectedPolicyTypeEnum rejectedPolicyType;
/** /**
* Is alarm * Is alarm

@ -181,8 +181,8 @@ public class ConfigServiceImpl implements ConfigService {
configAllInfo.setItemId(registerWrapper.getItemId()); configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId()); configAllInfo.setTpId(registerParameter.getThreadPoolId());
configAllInfo.setLivenessAlarm(registerParameter.getActiveAlarm()); configAllInfo.setLivenessAlarm(registerParameter.getActiveAlarm());
configAllInfo.setQueueType(registerParameter.getQueueType().type); configAllInfo.setQueueType(registerParameter.getBlockingQueueType().getType());
configAllInfo.setRejectedType(registerParameter.getRejectedType().type); configAllInfo.setRejectedType(registerParameter.getRejectedPolicyType().getType());
configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut()); configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut());
return configAllInfo; return configAllInfo;
} }

@ -17,7 +17,7 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,7 +33,7 @@ public class CommonDynamicThreadPool {
.threadFactory(threadPoolId) .threadFactory(threadPoolId)
.poolThreadSize(2, 4) .poolThreadSize(2, 4)
.keepAliveTime(60L, TimeUnit.SECONDS) .keepAliveTime(60L, TimeUnit.SECONDS)
.workQueue(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE, 1024) .workQueue(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE, 1024)
.build(); .build();
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;
} }

@ -18,7 +18,7 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.design.builder.Builder; import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskDecorator;
@ -47,7 +47,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
private int capacity = 512; private int capacity = 512;
private QueueTypeEnum queueType; private BlockingQueueTypeEnum blockingQueueType;
private BlockingQueue workQueue = new LinkedBlockingQueue(capacity); private BlockingQueue workQueue = new LinkedBlockingQueue(capacity);
@ -150,8 +150,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder workQueue(QueueTypeEnum queueType, int capacity) { public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum queueType, int capacity) {
this.queueType = queueType; this.blockingQueueType = queueType;
this.capacity = capacity; this.capacity = capacity;
return this; return this;
} }
@ -161,8 +161,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder workQueue(QueueTypeEnum queueType) { public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum blockingQueueType) {
this.queueType = queueType; this.blockingQueueType = blockingQueueType;
return this; return this;
} }
@ -246,8 +246,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
initParam.setAwaitTerminationMillis(builder.awaitTerminationMillis); initParam.setAwaitTerminationMillis(builder.awaitTerminationMillis);
} }
if (!builder.isFastPool) { if (!builder.isFastPool) {
if (builder.queueType != null) { if (builder.blockingQueueType != null) {
builder.workQueue = QueueTypeEnum.createBlockingQueue(builder.queueType.type, builder.capacity); builder.workQueue = BlockingQueueTypeEnum.createBlockingQueue(builder.blockingQueueType.getType(), builder.capacity);
} }
initParam.setWorkQueue(builder.workQueue); initParam.setWorkQueue(builder.workQueue);
} }

@ -18,8 +18,8 @@
package cn.hippo4j.core.executor.support.service; package cn.hippo4j.core.executor.support.service;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -41,11 +41,11 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP
.threadPoolId(registerParameter.getThreadPoolId()) .threadPoolId(registerParameter.getThreadPoolId())
.corePoolSize(registerParameter.getCorePoolSize()) .corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize()) .maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType().type, registerParameter.getCapacity())) .workQueue(BlockingQueueTypeEnum.createBlockingQueue(registerParameter.getBlockingQueueType().getType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadNamePrefix()) .threadFactory(registerParameter.getThreadNamePrefix())
.keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS)
.executeTimeOut(registerParameter.getExecuteTimeOut()) .executeTimeOut(registerParameter.getExecuteTimeOut())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType().type)) .rejected(RejectedPolicyTypeEnum.createPolicy(registerParameter.getRejectedPolicyType().getType()))
.dynamicPool() .dynamicPool()
.build(); .build();
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;

@ -23,8 +23,8 @@ import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNoti
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.message.enums.NotifyPlatformEnum; import cn.hippo4j.message.enums.NotifyPlatformEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,13 +40,13 @@ public class RegisterDynamicThreadPoolTest {
DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder() DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
.corePoolSize(1) .corePoolSize(1)
.maximumPoolSize(2) .maximumPoolSize(2)
.queueType(QueueTypeEnum.LINKED_BLOCKING_QUEUE) .blockingQueueType(BlockingQueueTypeEnum.LINKED_BLOCKING_QUEUE)
.capacity(1024) .capacity(1024)
// TimeUnit.SECONDS // TimeUnit.SECONDS
.keepAliveTime(1024L) .keepAliveTime(1024L)
// TimeUnit.MILLISECONDS // TimeUnit.MILLISECONDS
.executeTimeOut(1024L) .executeTimeOut(1024L)
.rejectedType(RejectedTypeEnum.DISCARD_POLICY) .rejectedPolicyType(RejectedPolicyTypeEnum.DISCARD_POLICY)
.isAlarm(true) .isAlarm(true)
.allowCoreThreadTimeOut(false) .allowCoreThreadTimeOut(false)
.capacityAlarm(90) .capacityAlarm(90)

@ -23,8 +23,8 @@ import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
@ -200,7 +200,7 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
|| !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()) || !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())
|| ||
(!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())); && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName()));
return result; return result;
} }
@ -238,7 +238,7 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
} }
} }
if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler()); RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler); dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
@ -251,7 +251,7 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS); executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
} }
if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) { && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue(); ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity()); queue.setCapacity(properties.getQueueCapacity());

@ -24,8 +24,8 @@ import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
@ -67,10 +67,10 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
.maximumPoolSize(registerParameter.getMaximumPoolSize()) .maximumPoolSize(registerParameter.getMaximumPoolSize())
.allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(registerParameter.getAllowCoreThreadTimeOut()))) .allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(registerParameter.getAllowCoreThreadTimeOut())))
.keepAliveTime(registerParameter.getKeepAliveTime()) .keepAliveTime(registerParameter.getKeepAliveTime())
.blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType().type)) .blockingQueue(BlockingQueueTypeEnum.getBlockingQueueNameByType(registerParameter.getBlockingQueueType().getType()))
.capacityAlarm(registerParameter.getCapacity()) .capacityAlarm(registerParameter.getCapacity())
.threadNamePrefix(registerParameter.getThreadNamePrefix()) .threadNamePrefix(registerParameter.getThreadNamePrefix())
.rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType().type)) .rejectedHandler(RejectedPolicyTypeEnum.getRejectedNameByType(registerParameter.getRejectedPolicyType().getType()))
.executeTimeOut(registerParameter.getExecuteTimeOut()) .executeTimeOut(registerParameter.getExecuteTimeOut())
.threadPoolId(registerParameter.getThreadPoolId()) .threadPoolId(registerParameter.getThreadPoolId())
.build(); .build();

@ -18,8 +18,8 @@
package cn.hippo4j.core.springboot.starter.support; package cn.hippo4j.core.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.DynamicThreadPool; import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
@ -118,7 +118,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.orElse(null); .orElse(null);
if (executorProperties != null) { if (executorProperties != null) {
try { try {
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
String threadNamePrefix = executorProperties.getThreadNamePrefix(); String threadNamePrefix = executorProperties.getThreadNamePrefix();
newDynamicPoolExecutor = ThreadPoolBuilder.builder() newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
@ -127,7 +127,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L)) .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L))
.poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize()) .poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize())
.keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler())) .rejected(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut()) .allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
.build(); .build();
} catch (Exception ex) { } catch (Exception ex) {

@ -18,7 +18,7 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -46,7 +46,7 @@ public class DynamicThreadPoolSubscribeConfig {
.maxPoolNum(2) .maxPoolNum(2)
.keepAliveTime(2000) .keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS) .timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE) .workQueue(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE)
.allowCoreThreadTimeOut(true) .allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config") .threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy()) .rejected(new ThreadPoolExecutor.AbortPolicy())

@ -26,8 +26,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
@ -94,7 +94,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
.nowAllowsCoreThreadTimeOut(EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut())) .nowAllowsCoreThreadTimeOut(EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()))
.nowKeepAliveTime(afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)) .nowKeepAliveTime(afterExecutor.getKeepAliveTime(TimeUnit.SECONDS))
.nowQueueCapacity((afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())) .nowQueueCapacity((afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size()))
.nowRejectedName(RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())) .nowRejectedName(RejectedPolicyTypeEnum.getRejectedNameByType(parameter.getRejectedType()))
.nowExecuteTimeOut(executeTimeOut) .nowExecuteTimeOut(executeTimeOut)
.build(); .build();
changeNotifyRequest.setThreadPoolId(threadPoolId); changeNotifyRequest.setThreadPoolId(threadPoolId);
@ -106,7 +106,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
String.format(CHANGE_DELIMITER, originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())), String.format(CHANGE_DELIMITER, originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())),
String.format(CHANGE_DELIMITER, originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)), String.format(CHANGE_DELIMITER, originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)),
String.format(CHANGE_DELIMITER, originalExecuteTimeOut, executeTimeOut), String.format(CHANGE_DELIMITER, originalExecuteTimeOut, executeTimeOut),
String.format(CHANGE_DELIMITER, originalRejected, RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())), String.format(CHANGE_DELIMITER, originalRejected, RejectedPolicyTypeEnum.getRejectedNameByType(parameter.getRejectedType())),
String.format(CHANGE_DELIMITER, originalAllowCoreThreadTimeOut, EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()))); String.format(CHANGE_DELIMITER, originalAllowCoreThreadTimeOut, EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut())));
} }
@ -128,7 +128,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
} }
} }
if (parameter.getCapacity() != null if (parameter.getCapacity() != null
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type, parameter.getQueueType())) { && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getType(), parameter.getQueueType())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); ResizableCapacityLinkedBlockingQueue queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue();
queue.setCapacity(parameter.getCapacity()); queue.setCapacity(parameter.getCapacity());
@ -144,7 +144,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut); ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut);
} }
if (parameter.getRejectedType() != null) { if (parameter.getRejectedType() != null) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(parameter.getRejectedType()); RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(parameter.getRejectedType());
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler); dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);

@ -20,8 +20,8 @@ package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.enums.EnableEnum; import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.BooleanUtil; import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
@ -135,14 +135,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
String resultJsonStr = JSONUtil.toJSONString(result.getData()); String resultJsonStr = JSONUtil.toJSONString(result.getData());
if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) {
// Create a thread pool with relevant parameters. // Create a thread pool with relevant parameters.
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder() newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
.workQueue(workQueue) .workQueue(workQueue)
.threadFactory(threadPoolId) .threadFactory(threadPoolId)
.poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt()) .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())
.keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())) .rejected(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()))
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut())) .allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build(); .build();
// Set dynamic thread pool enhancement parameters. // Set dynamic thread pool enhancement parameters.

Loading…
Cancel
Save