feature: 修改动态线程池命名.

pull/10/head
chen.ma 3 years ago
parent 3663c2c780
commit 7bd785f76c

@ -2,7 +2,7 @@ package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -28,7 +28,7 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
private final List<SendMessageHandler> sendMessageHandlers = new ArrayList(4); private final List<SendMessageHandler> sendMessageHandlers = new ArrayList(4);
@Override @Override
public void sendAlarmMessage(CustomThreadPoolExecutor threadPoolExecutor) { public void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor) {
for (SendMessageHandler messageHandler : sendMessageHandlers) { for (SendMessageHandler messageHandler : sendMessageHandlers) {
try { try {
messageHandler.sendAlarmMessage(notifyConfigs, threadPoolExecutor); messageHandler.sendAlarmMessage(notifyConfigs, threadPoolExecutor);

@ -8,10 +8,10 @@ import com.dingtalk.api.request.OapiRobotSendRequest;
import com.github.dynamic.threadpool.common.model.InstanceInfo; import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.core.GlobalThreadPoolManage; import com.github.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrapper;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.taobao.api.ApiException; import com.taobao.api.ApiException;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
@ -45,7 +45,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
} }
@Override @Override
public void sendAlarmMessage(List<NotifyConfig> notifyConfigs, CustomThreadPoolExecutor pool) { public void sendAlarmMessage(List<NotifyConfig> notifyConfigs, DynamicThreadPoolExecutor pool) {
Optional<NotifyConfig> notifyConfigOptional = notifyConfigs.stream() Optional<NotifyConfig> notifyConfigOptional = notifyConfigs.stream()
.filter(each -> Objects.equals(each.getType(), getType())) .filter(each -> Objects.equals(each.getType(), getType()))
.findFirst(); .findFirst();
@ -60,7 +60,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
changeConfigOptional.ifPresent(each -> dingChangeSendMessage(each, parameter)); changeConfigOptional.ifPresent(each -> dingChangeSendMessage(each, parameter));
} }
private void dingAlarmSendMessage(NotifyConfig notifyConfig, CustomThreadPoolExecutor pool) { private void dingAlarmSendMessage(NotifyConfig notifyConfig, DynamicThreadPoolExecutor pool) {
List<String> receives = StrUtil.split(notifyConfig.getReceives(), ','); List<String> receives = StrUtil.split(notifyConfig.getReceives(), ',');
String afterReceives = Joiner.on(", @").join(receives); String afterReceives = Joiner.on(", @").join(receives);
@ -133,7 +133,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
private void dingChangeSendMessage(NotifyConfig notifyConfig, PoolParameterInfo parameter) { private void dingChangeSendMessage(NotifyConfig notifyConfig, PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId(); String threadPoolId = parameter.getTpId();
DynamicThreadPoolWrap poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId); DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (poolWrap == null) { if (poolWrap == null) {
log.warn("Thread pool is empty when sending change notification, threadPoolId :: {}", threadPoolId); log.warn("Thread pool is empty when sending change notification, threadPoolId :: {}", threadPoolId);
return; return;
@ -142,7 +142,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
List<String> receives = StrUtil.split(notifyConfig.getReceives(), ','); List<String> receives = StrUtil.split(notifyConfig.getReceives(), ',');
String afterReceives = Joiner.on(", @").join(receives); String afterReceives = Joiner.on(", @").join(receives);
CustomThreadPoolExecutor customPool = poolWrap.getPool(); DynamicThreadPoolExecutor customPool = poolWrap.getPool();
/** /**
* hesitant e.g. * hesitant e.g.
*/ */

@ -1,7 +1,7 @@
package com.github.dynamic.threadpool.starter.alarm; package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import java.util.List; import java.util.List;
@ -26,7 +26,7 @@ public interface SendMessageHandler {
* @param notifyConfigs * @param notifyConfigs
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendAlarmMessage(List<NotifyConfig> notifyConfigs, CustomThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(List<NotifyConfig> notifyConfigs, DynamicThreadPoolExecutor threadPoolExecutor);
/** /**
* Send change message. * Send change message.

@ -1,7 +1,7 @@
package com.github.dynamic.threadpool.starter.alarm; package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
/** /**
* Send msg. * Send msg.
@ -16,7 +16,7 @@ public interface SendMessageService {
* *
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendAlarmMessage(CustomThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor);
/** /**
* Send change message. * Send change message.

@ -4,7 +4,7 @@ import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.config.MessageAlarmConfig; import com.github.dynamic.threadpool.starter.config.MessageAlarmConfig;
import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil; import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue; import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -44,7 +44,7 @@ public class ThreadPoolAlarmManage {
* *
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public static void checkPoolCapacityAlarm(CustomThreadPoolExecutor threadPoolExecutor) { public static void checkPoolCapacityAlarm(DynamicThreadPoolExecutor threadPoolExecutor) {
if (SEND_MESSAGE_SERVICE == null) { if (SEND_MESSAGE_SERVICE == null) {
return; return;
} }
@ -68,7 +68,7 @@ public class ThreadPoolAlarmManage {
* @param isCore * @param isCore
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public static void checkPoolLivenessAlarm(boolean isCore, CustomThreadPoolExecutor threadPoolExecutor) { public static void checkPoolLivenessAlarm(boolean isCore, DynamicThreadPoolExecutor threadPoolExecutor) {
if (isCore || SEND_MESSAGE_SERVICE == null || !isSendMessage(threadPoolExecutor, MessageTypeEnum.LIVENESS)) { if (isCore || SEND_MESSAGE_SERVICE == null || !isSendMessage(threadPoolExecutor, MessageTypeEnum.LIVENESS)) {
return; return;
} }
@ -88,7 +88,7 @@ public class ThreadPoolAlarmManage {
* *
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public static void checkPoolRejectAlarm(CustomThreadPoolExecutor threadPoolExecutor) { public static void checkPoolRejectAlarm(DynamicThreadPoolExecutor threadPoolExecutor) {
if (SEND_MESSAGE_SERVICE == null) { if (SEND_MESSAGE_SERVICE == null) {
return; return;
} }
@ -118,7 +118,7 @@ public class ThreadPoolAlarmManage {
* @param typeEnum * @param typeEnum
* @return * @return
*/ */
private static boolean isSendMessage(CustomThreadPoolExecutor threadPoolExecutor, MessageTypeEnum typeEnum) { private static boolean isSendMessage(DynamicThreadPoolExecutor threadPoolExecutor, MessageTypeEnum typeEnum) {
AlarmControlDTO alarmControl = AlarmControlDTO.builder() AlarmControlDTO alarmControl = AlarmControlDTO.builder()
.threadPool(threadPoolExecutor.getThreadPoolId()) .threadPool(threadPoolExecutor.getThreadPoolId())
.typeEnum(typeEnum) .typeEnum(typeEnum)

@ -1,6 +1,6 @@
package com.github.dynamic.threadpool.starter.common; package com.github.dynamic.threadpool.starter.common;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedPolicies; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedPolicies;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
@ -8,17 +8,16 @@ import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Common threadPool. * Common dynamic threadPool.
* *
* @author chen.ma * @author chen.ma
* @date 2021/6/16 22:35 * @date 2021/6/16 22:35
*/ */
public class CommonThreadPool { public class CommonDynamicThreadPool {
public static CustomThreadPoolExecutor getInstance(String threadPoolId) { public static DynamicThreadPoolExecutor getInstance(String threadPoolId) {
CustomThreadPoolExecutor poolExecutor = (CustomThreadPoolExecutor) ThreadPoolBuilder.builder() DynamicThreadPoolExecutor poolExecutor = (DynamicThreadPoolExecutor) ThreadPoolBuilder.builder()
.isCustomPool(true) .dynamicPool()
.threadPoolId(threadPoolId)
.threadFactory(threadPoolId) .threadFactory(threadPoolId)
.poolThreadSize(3, 5) .poolThreadSize(3, 5)
.keepAliveTime(1000L, TimeUnit.SECONDS) .keepAliveTime(1000L, TimeUnit.SECONDS)

@ -1,4 +1,4 @@
package com.github.dynamic.threadpool.starter.toolkit.thread; package com.github.dynamic.threadpool.starter.core;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage;
@ -18,12 +18,12 @@ import java.util.concurrent.locks.ReentrantLock;
import static com.github.dynamic.threadpool.common.constant.Constants.MAP_INITIAL_CAPACITY; import static com.github.dynamic.threadpool.common.constant.Constants.MAP_INITIAL_CAPACITY;
/** /**
* Custom threadPool wrap. * Dynamic threadPool wrap.
* *
* @author chen.ma * @author chen.ma
* @date 2021/7/8 21:47 * @date 2021/7/8 21:47
*/ */
public final class CustomThreadPoolExecutor extends ThreadPoolExecutor { public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger rejectCount = new AtomicInteger(); private final AtomicInteger rejectCount = new AtomicInteger();
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
@ -58,7 +58,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy(); private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy();
private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread"); private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread");
public CustomThreadPoolExecutor(int corePoolSize, public DynamicThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, int maximumPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,

@ -5,14 +5,13 @@ import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.constant.Constants; import com.github.dynamic.threadpool.common.constant.Constants;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.common.web.base.Result; import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.common.CommonDynamicThreadPool;
import com.github.dynamic.threadpool.starter.config.BootstrapProperties; import com.github.dynamic.threadpool.starter.config.BootstrapProperties;
import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrapper;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import lombok.var; import lombok.var;
@ -55,18 +54,18 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof CustomThreadPoolExecutor) { if (bean instanceof DynamicThreadPoolExecutor) {
var dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class); var dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) { if (Objects.isNull(dynamicThreadPool)) {
return bean; return bean;
} }
var customExecutor = (CustomThreadPoolExecutor) bean; var dynamicExecutor = (DynamicThreadPoolExecutor) bean;
var wrap = new DynamicThreadPoolWrap(customExecutor.getThreadPoolId(), customExecutor); var wrap = new DynamicThreadPoolWrapper(dynamicExecutor.getThreadPoolId(), dynamicExecutor);
CustomThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap); var remoteExecutor = fillPoolAndRegister(wrap);
subscribeConfig(wrap); subscribeConfig(wrap);
return remoteExecutor; return remoteExecutor;
} else if (bean instanceof DynamicThreadPoolWrap) { } else if (bean instanceof DynamicThreadPoolWrapper) {
var wrap = (DynamicThreadPoolWrap) bean; var wrap = (DynamicThreadPoolWrapper) bean;
registerAndSubscribe(wrap); registerAndSubscribe(wrap);
} }
@ -78,9 +77,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* *
* @param dynamicThreadPoolWrap * @param dynamicThreadPoolWrap
*/ */
protected void registerAndSubscribe(DynamicThreadPoolWrap dynamicThreadPoolWrap) { protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
executorService.execute(() -> {
fillPoolAndRegister(dynamicThreadPoolWrap); fillPoolAndRegister(dynamicThreadPoolWrap);
subscribeConfig(dynamicThreadPoolWrap); subscribeConfig(dynamicThreadPoolWrap);
});
} }
/** /**
@ -88,7 +89,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* *
* @param dynamicThreadPoolWrap * @param dynamicThreadPoolWrap
*/ */
protected CustomThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) { protected DynamicThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
String tpId = dynamicThreadPoolWrap.getTpId(); String tpId = dynamicThreadPoolWrap.getTpId();
Map<String, String> queryStrMap = new HashMap(3); Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put(TP_ID, tpId); queryStrMap.put(TP_ID, tpId);
@ -97,7 +98,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
Result result; Result result;
boolean isSubscribe = false; boolean isSubscribe = false;
CustomThreadPoolExecutor poolExecutor = null; DynamicThreadPoolExecutor poolExecutor = null;
PoolParameterInfo ppi = new PoolParameterInfo(); PoolParameterInfo ppi = new PoolParameterInfo();
try { try {
@ -105,10 +106,9 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池 // 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
poolExecutor = (CustomThreadPoolExecutor) ThreadPoolBuilder.builder() poolExecutor = (DynamicThreadPoolExecutor) ThreadPoolBuilder.builder()
.isCustomPool(true) .dynamicPool()
.workQueue(workQueue) .workQueue(workQueue)
.threadPoolId(tpId)
.threadFactory(tpId) .threadFactory(tpId)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS)
@ -120,13 +120,13 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
isSubscribe = true; isSubscribe = true;
} }
} catch (Exception ex) { } catch (Exception ex) {
poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonThreadPool.getInstance(tpId); poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonDynamicThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setPool(poolExecutor); dynamicThreadPoolWrap.setPool(poolExecutor);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
} finally { } finally {
if (Objects.isNull(dynamicThreadPoolWrap.getPool())) { if (Objects.isNull(dynamicThreadPoolWrap.getPool())) {
dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId)); dynamicThreadPoolWrap.setPool(CommonDynamicThreadPool.getInstance(tpId));
} }
// 设置是否订阅远端线程池配置 // 设置是否订阅远端线程池配置
@ -137,7 +137,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return poolExecutor; return poolExecutor;
} }
protected void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) { protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) { if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config)); threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));
} }

@ -1,6 +1,6 @@
package com.github.dynamic.threadpool.starter.core; package com.github.dynamic.threadpool.starter.core;
import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrapper;
import com.github.dynamic.threadpool.common.model.PoolParameter; import com.github.dynamic.threadpool.common.model.PoolParameter;
import java.util.Map; import java.util.Map;
@ -16,9 +16,9 @@ public class GlobalThreadPoolManage {
private static final Map<String, PoolParameter> POOL_PARAMETER = new ConcurrentHashMap(); private static final Map<String, PoolParameter> POOL_PARAMETER = new ConcurrentHashMap();
private static final Map<String, DynamicThreadPoolWrap> EXECUTOR_MAP = new ConcurrentHashMap(); private static final Map<String, DynamicThreadPoolWrapper> EXECUTOR_MAP = new ConcurrentHashMap();
public static DynamicThreadPoolWrap getExecutorService(String tpId) { public static DynamicThreadPoolWrapper getExecutorService(String tpId) {
return EXECUTOR_MAP.get(tpId); return EXECUTOR_MAP.get(tpId);
} }
@ -26,12 +26,12 @@ public class GlobalThreadPoolManage {
return POOL_PARAMETER.get(tpId); return POOL_PARAMETER.get(tpId);
} }
public static void register(String tpId, PoolParameter poolParameter, DynamicThreadPoolWrap executor) { public static void register(String tpId, PoolParameter poolParameter, DynamicThreadPoolWrapper executor) {
registerPool(tpId, executor); registerPool(tpId, executor);
registerPoolParameter(tpId, poolParameter); registerPoolParameter(tpId, poolParameter);
} }
public static void registerPool(String tpId, DynamicThreadPoolWrap executor) { public static void registerPool(String tpId, DynamicThreadPoolWrapper executor) {
EXECUTOR_MAP.put(tpId, executor); EXECUTOR_MAP.put(tpId, executor);
} }

@ -3,8 +3,8 @@ package com.github.dynamic.threadpool.starter.handler;
import com.github.dynamic.threadpool.common.model.PoolRunStateInfo; import com.github.dynamic.threadpool.common.model.PoolRunStateInfo;
import com.github.dynamic.threadpool.starter.core.GlobalThreadPoolManage; import com.github.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil; import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress; import java.net.InetAddress;
@ -32,7 +32,7 @@ public class ThreadPoolRunStateHandler {
} }
public static PoolRunStateInfo getPoolRunState(String tpId) { public static PoolRunStateInfo getPoolRunState(String tpId) {
DynamicThreadPoolWrap executorService = GlobalThreadPoolManage.getExecutorService(tpId); DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(tpId);
ThreadPoolExecutor pool = executorService.getPool(); ThreadPoolExecutor pool = executorService.getPool();
// 核心线程数 // 核心线程数
@ -78,8 +78,8 @@ public class ThreadPoolRunStateHandler {
stateInfo.setHost(INET_ADDRESS.getHostAddress()); stateInfo.setHost(INET_ADDRESS.getHostAddress());
stateInfo.setTpId(tpId); stateInfo.setTpId(tpId);
int rejectCount = pool instanceof CustomThreadPoolExecutor int rejectCount = pool instanceof DynamicThreadPoolExecutor
? ((CustomThreadPoolExecutor) pool).getRejectCount() ? ((DynamicThreadPoolExecutor) pool).getRejectCount()
: -1; : -1;
stateInfo.setRejectCount(rejectCount); stateInfo.setRejectCount(rejectCount);

@ -2,6 +2,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread;
import com.github.dynamic.threadpool.common.toolkit.Assert; import com.github.dynamic.threadpool.common.toolkit.Assert;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm;
import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -88,15 +89,15 @@ public class AbstractBuildThreadPoolTemplate {
} }
/** /**
* 线 * 线
* *
* @param initParam * @param initParam
* @return * @return
*/ */
public static CustomThreadPoolExecutor buildCustomPool(ThreadPoolInitParam initParam) { public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam); Assert.notNull(initParam);
CustomThreadPoolExecutor executorService = DynamicThreadPoolExecutor executorService =
new CustomThreadPoolExecutor(initParam.getCorePoolNum(), new DynamicThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(), initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(), initParam.getKeepAliveTime(),
initParam.getTimeUnit(), initParam.getTimeUnit(),

@ -22,9 +22,9 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
private boolean isFastPool; private boolean isFastPool;
/** /**
* 线 * 线
*/ */
private boolean isCustomPool; private boolean isDynamicPool;
/** /**
* 线 * 线
@ -111,8 +111,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder isCustomPool(Boolean isCustomPool) { public ThreadPoolBuilder dynamicPool() {
this.isCustomPool = isCustomPool; this.isDynamicPool = true;
return this; return this;
} }
@ -204,8 +204,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/ */
@Override @Override
public ThreadPoolExecutor build() { public ThreadPoolExecutor build() {
if (isCustomPool) { if (isDynamicPool) {
return buildCustomPool(this); return buildDynamicPool(this);
} }
return isFastPool ? buildFastPool(this) : buildPool(this); return isFastPool ? buildFastPool(this) : buildPool(this);
} }
@ -240,13 +240,13 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
} }
/** /**
* 线 * 线
* *
* @param builder * @param builder
* @return * @return
*/ */
private static ThreadPoolExecutor buildCustomPool(ThreadPoolBuilder builder) { private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildCustomPool(buildInitParam(builder)); return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder));
} }
/** /**
@ -267,7 +267,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
.setRejectedExecutionHandler(builder.rejectedExecutionHandler) .setRejectedExecutionHandler(builder.rejectedExecutionHandler)
.setTimeUnit(builder.timeUnit); .setTimeUnit(builder.timeUnit);
if (builder.isCustomPool) { if (builder.isDynamicPool) {
String threadPoolId = Optional.ofNullable(builder.threadPoolId).orElse(builder.threadNamePrefix); String threadPoolId = Optional.ofNullable(builder.threadPoolId).orElse(builder.threadNamePrefix);
initParam.setThreadPoolId(threadPoolId); initParam.setThreadPoolId(threadPoolId);
ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(builder.isAlarm, builder.capacityAlarm, builder.livenessAlarm); ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(builder.isAlarm, builder.capacityAlarm, builder.livenessAlarm);

@ -1,7 +1,7 @@
package com.github.dynamic.threadpool.starter.wrap; package com.github.dynamic.threadpool.starter.wrap;
import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.common.CommonDynamicThreadPool;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolExecutor;
import lombok.Data; import lombok.Data;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -14,7 +14,7 @@ import java.util.concurrent.Future;
* @date 2021/6/20 16:55 * @date 2021/6/20 16:55
*/ */
@Data @Data
public class DynamicThreadPoolWrap { public class DynamicThreadPoolWrapper {
private String tenantId; private String tenantId;
@ -24,15 +24,15 @@ public class DynamicThreadPoolWrap {
private boolean subscribeFlag; private boolean subscribeFlag;
private CustomThreadPoolExecutor pool; private DynamicThreadPoolExecutor pool;
/** /**
* 线, 使线 {@link CommonThreadPool#getInstance(String)} * 线, 使线 {@link CommonDynamicThreadPool#getInstance(String)}
* *
* @param threadPoolId * @param threadPoolId
*/ */
public DynamicThreadPoolWrap(String threadPoolId) { public DynamicThreadPoolWrapper(String threadPoolId) {
this(threadPoolId, CommonThreadPool.getInstance(threadPoolId)); this(threadPoolId, CommonDynamicThreadPool.getInstance(threadPoolId));
} }
/** /**
@ -41,7 +41,7 @@ public class DynamicThreadPoolWrap {
* @param threadPoolId * @param threadPoolId
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public DynamicThreadPoolWrap(String threadPoolId, CustomThreadPoolExecutor threadPoolExecutor) { public DynamicThreadPoolWrapper(String threadPoolId, DynamicThreadPoolExecutor threadPoolExecutor) {
this.tpId = threadPoolId; this.tpId = threadPoolId;
this.pool = threadPoolExecutor; this.pool = threadPoolExecutor;
} }
Loading…
Cancel
Save