Development configuration change message push.

pull/161/head
chen.ma 3 years ago
parent 463407c19f
commit ea6d0c9a95

@ -1,9 +1,11 @@
package com.github.dynamic.threadpool.starter.alarm; package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import java.util.ArrayList; import java.util.ArrayList;
@ -16,6 +18,7 @@ import java.util.Map;
* @author chen.ma * @author chen.ma
* @date 2021/8/15 15:34 * @date 2021/8/15 15:34
*/ */
@Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class BaseSendMessageService implements InitializingBean, SendMessageService { public class BaseSendMessageService implements InitializingBean, SendMessageService {
@ -25,12 +28,23 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
private final List<SendMessageHandler> sendMessageHandlers = new ArrayList(4); private final List<SendMessageHandler> sendMessageHandlers = new ArrayList(4);
@Override @Override
public void sendMessage(CustomThreadPoolExecutor threadPoolExecutor) { public void sendAlarmMessage(CustomThreadPoolExecutor threadPoolExecutor) {
for (SendMessageHandler messageHandler : sendMessageHandlers) { for (SendMessageHandler messageHandler : sendMessageHandlers) {
try { try {
messageHandler.sendMessage(alarmConfigs, threadPoolExecutor); messageHandler.sendAlarmMessage(alarmConfigs, threadPoolExecutor);
} catch (Exception ex) { } catch (Exception ex) {
// ignore log.warn("Failed to send thread pool alarm notification.", ex);
}
}
}
@Override
public void sendChangeMessage(PoolParameterInfo parameter) {
for (SendMessageHandler messageHandler : sendMessageHandlers) {
try {
messageHandler.sendChangeMessage(alarmConfigs, parameter);
} catch (Exception ex) {
log.warn("Failed to send thread pool change notification.", ex);
} }
} }
} }

@ -6,7 +6,12 @@ import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest; import com.dingtalk.api.request.OapiRobotSendRequest;
import com.github.dynamic.threadpool.common.model.InstanceInfo; import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import com.taobao.api.ApiException; import com.taobao.api.ApiException;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -16,9 +21,10 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/** /**
* Ding Send. * Send ding notification message.
* *
* @author chen.ma * @author chen.ma
* @date 2021/8/15 15:49 * @date 2021/8/15 15:49
@ -39,22 +45,28 @@ public class DingSendMessageHandler implements SendMessageHandler {
} }
@Override @Override
public void sendMessage(List<AlarmConfig> alarmConfigs, CustomThreadPoolExecutor pool) { public void sendAlarmMessage(List<AlarmConfig> alarmConfigs, CustomThreadPoolExecutor pool) {
Optional<AlarmConfig> alarmConfigOptional = alarmConfigs.stream() Optional<AlarmConfig> alarmConfigOptional = alarmConfigs.stream()
.filter(each -> Objects.equals(each.getType(), getType())) .filter(each -> Objects.equals(each.getType(), getType()))
.findFirst(); .findFirst();
alarmConfigOptional.ifPresent(each -> dingSendMessage(each, pool)); alarmConfigOptional.ifPresent(each -> dingAlarmSendMessage(each, pool));
} }
public void dingSendMessage(AlarmConfig alarmConfig, CustomThreadPoolExecutor pool) { @Override
String serverUrl = alarmConfig.getUrl() + alarmConfig.getToken(); public void sendChangeMessage(List<AlarmConfig> alarmConfigs, PoolParameterInfo parameter) {
Optional<AlarmConfig> changeConfigOptional = alarmConfigs.stream()
.filter(each -> Objects.equals(each.getType(), getType()))
.findFirst();
changeConfigOptional.ifPresent(each -> dingChangeSendMessage(each, parameter));
}
private void dingAlarmSendMessage(AlarmConfig alarmConfig, CustomThreadPoolExecutor pool) {
BlockingQueue<Runnable> queue = pool.getQueue(); BlockingQueue<Runnable> queue = pool.getQueue();
String text = String.format( String text = String.format(
"<font color='#FF0000'>[警报] </font>%s - 动态线程池运行告警 \n\n" + "<font color='#FF0000'>[警报] </font>%s - 动态线程池运行告警 \n\n" +
" --- \n\n " + " --- \n\n " +
"<font color='#708090' size=2>线程池ID%s</font> \n\n " +
"<font color='#778899' size=2>应用实例:%s</font> \n\n " + "<font color='#778899' size=2>应用实例:%s</font> \n\n " +
"<font color='#708090' size=2>线程池名称:%s</font> \n\n " +
" --- \n\n " + " --- \n\n " +
"<font color='#708090' size=2>核心线程数:%d</font> \n\n " + "<font color='#708090' size=2>核心线程数:%d</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%d</font> \n\n " + "<font color='#708090' size=2>最大线程数:%d</font> \n\n " +
@ -64,11 +76,12 @@ public class DingSendMessageHandler implements SendMessageHandler {
"<font color='#708090' size=2>线程池任务总量:%d</font> \n\n " + "<font color='#708090' size=2>线程池任务总量:%d</font> \n\n " +
" --- \n\n " + " --- \n\n " +
"<font color='#708090' size=2>队列类型:%s</font> \n\n " + "<font color='#708090' size=2>队列类型:%s</font> \n\n " +
"<font color='#708090' size=2>队列容量:%d</font> \n\n " + "<font color='#708090' size=2>队列容量:%d</font> \n\n " +
"<font color='#708090' size=2>队列元素个数:%d</font> \n\n " + "<font color='#708090' size=2>队列元素个数:%d</font> \n\n " +
"<font color='#708090' size=2>队列剩余个数:%d</font> \n\n " + "<font color='#708090' size=2>队列剩余个数:%d</font> \n\n " +
" --- \n\n " + " --- \n\n " +
"<font color='#708090' size=2>拒绝策略次数:</font><font color='#FF0000' size=2>%d</font> \n\n " + "<font color='#708090' size=2>拒绝策略:%s</font> \n\n" +
"<font color='#708090' size=2>拒绝策略执行次数:</font><font color='#FF0000' size=2>%d</font> \n\n " +
"<font color='#708090' size=2>OWNER@%s</font> \n\n" + "<font color='#708090' size=2>OWNER@%s</font> \n\n" +
"<font color='#708090' size=2>提示5 分钟内此线程池不会重复告警(可配置)</font> \n\n" + "<font color='#708090' size=2>提示5 分钟内此线程池不会重复告警(可配置)</font> \n\n" +
" --- \n\n " + " --- \n\n " +
@ -76,10 +89,10 @@ public class DingSendMessageHandler implements SendMessageHandler {
// 环境 // 环境
active.toUpperCase(), active.toUpperCase(),
// 节点信息
instanceInfo.getIpApplicationName(),
// 线程池ID // 线程池ID
pool.getThreadPoolId(), pool.getThreadPoolId(),
// 节点信息
instanceInfo.getIpApplicationName(),
// 核心线程数 // 核心线程数
pool.getCorePoolSize(), pool.getCorePoolSize(),
// 最大线程数 // 最大线程数
@ -100,6 +113,8 @@ public class DingSendMessageHandler implements SendMessageHandler {
queue.size(), queue.size(),
// 队列剩余个数 // 队列剩余个数
queue.remainingCapacity(), queue.remainingCapacity(),
// 拒绝策略名称
pool.getRejectedExecutionHandler().getClass().getSimpleName(),
// 拒绝策略次数 // 拒绝策略次数
pool.getRejectCount(), pool.getRejectCount(),
// 告警手机号 // 告警手机号
@ -109,16 +124,81 @@ public class DingSendMessageHandler implements SendMessageHandler {
); );
execute(alarmConfig, "动态线程池告警", text, CollUtil.newArrayList("15601166691"));
}
private void dingChangeSendMessage(AlarmConfig alarmConfig, PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId();
DynamicThreadPoolWrap poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (poolWrap == null) {
log.warn("Thread pool is empty when sending change notification, threadPoolId :: {}", threadPoolId);
return;
}
CustomThreadPoolExecutor customPool = poolWrap.getPool();
/**
* hesitant e.g.
*/
String text = String.format(
"<font color='#2a9d8f'>[通知] </font>%s - 动态线程池参数变更 \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>线程池ID%s</font> \n\n " +
"<font color='#778899' size=2>应用实例:%s</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>核心线程数:%s</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%s</font> \n\n " +
"<font color='#708090' size=2>线程存活时间:%s / SECONDS</font> \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>队列类型:%s</font> \n\n " +
"<font color='#708090' size=2>队列容量:%s</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>AGO 拒绝策略:%s</font> \n\n" +
"<font color='#708090' size=2>NOW 拒绝策略:%s</font> \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>提示:动态线程池配置变更实时通知(无限制)</font> \n\n" +
"<font color='#708090' size=2>OWNER@%s</font> \n\n" +
" --- \n\n " +
"**播报时间:%s**",
// 环境
active.toUpperCase(),
// 线程池名称
threadPoolId,
// 节点信息
instanceInfo.getIpApplicationName(),
// 核心线程数
customPool.getCorePoolSize() + " ➲ " + parameter.getCoreSize(),
// 最大线程数
customPool.getMaximumPoolSize() + " ➲ " + parameter.getMaxSize(),
// 线程存活时间
customPool.getKeepAliveTime(TimeUnit.SECONDS) + " ➲ " + parameter.getKeepAliveTime(),
// 阻塞队列
QueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()),
// 阻塞队列容量
(customPool.getQueue().size() + customPool.getQueue().remainingCapacity()) + " ➲ " + parameter.getCapacity(),
// 拒绝策略
customPool.getRejectedExecutionHandler().getClass().getSimpleName(),
RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType()),
// 告警手机号
"15601166691",
// 当前时间
DateUtil.now()
);
execute(alarmConfig, "动态线程池通知", text, CollUtil.newArrayList("15601166691"));
}
private void execute(AlarmConfig alarmConfig, String title, String text, List<String> mobiles) {
String serverUrl = alarmConfig.getUrl() + alarmConfig.getToken();
DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl); DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl);
OapiRobotSendRequest request = new OapiRobotSendRequest(); OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown"); request.setMsgtype("markdown");
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown(); OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
markdown.setTitle("动态线程池告警"); markdown.setTitle(title);
markdown.setText(text); markdown.setText(text);
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At(); OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setAtMobiles(CollUtil.newArrayList("15601166691")); at.setAtMobiles(mobiles);
request.setAt(at); request.setAt(at);
request.setMarkdown(markdown); request.setMarkdown(markdown);

@ -0,0 +1,15 @@
package com.github.dynamic.threadpool.starter.alarm;
/**
* Message Type Enum.
*
* @author chen.ma
* @date 2021/8/16 20:50
*/
public enum MessageTypeEnum {
CHANGE,
ALARM
}

@ -1,5 +1,6 @@
package com.github.dynamic.threadpool.starter.alarm; package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import java.util.List; import java.util.List;
@ -13,18 +14,26 @@ import java.util.List;
public interface SendMessageHandler { public interface SendMessageHandler {
/** /**
* getType. * Get type.
* *
* @return * @return
*/ */
String getType(); String getType();
/** /**
* sendMessage. * Send alarm message.
* *
* @param alarmConfigs * @param alarmConfigs
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendMessage(List<AlarmConfig> alarmConfigs, CustomThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(List<AlarmConfig> alarmConfigs, CustomThreadPoolExecutor threadPoolExecutor);
/**
* Send change message.
*
* @param alarmConfigs
* @param parameter
*/
void sendChangeMessage(List<AlarmConfig> alarmConfigs, PoolParameterInfo parameter);
} }

@ -1,5 +1,6 @@
package com.github.dynamic.threadpool.starter.alarm; package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
/** /**
@ -11,10 +12,17 @@ import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExec
public interface SendMessageService { public interface SendMessageService {
/** /**
* sendMessage. * Send alarm message.
* *
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendMessage(CustomThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(CustomThreadPoolExecutor threadPoolExecutor);
/**
* Send change message.
*
* @param parameter
*/
void sendChangeMessage(PoolParameterInfo parameter);
} }

@ -1,6 +1,7 @@
package com.github.dynamic.threadpool.starter.alarm; package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil; import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor; import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue; import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue;
@ -22,7 +23,7 @@ public class ThreadPoolAlarmManage {
} }
/** /**
* checkPoolCapacityAlarm. * Check thread pool capacity alarm.
* *
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
@ -35,12 +36,12 @@ public class ThreadPoolAlarmManage {
int capacity = queueSize + blockIngQueue.remainingCapacity(); int capacity = queueSize + blockIngQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity); int divide = CalculateUtil.divide(queueSize, capacity);
if (divide > threadPoolAlarm.getCapacityAlarm()) { if (divide > threadPoolAlarm.getCapacityAlarm()) {
SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor);
} }
} }
/** /**
* checkPoolLivenessAlarm. * Check thread pool activity alarm.
* *
* @param isCore * @param isCore
* @param threadPoolExecutor * @param threadPoolExecutor
@ -53,17 +54,26 @@ public class ThreadPoolAlarmManage {
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize); int divide = CalculateUtil.divide(activeCount, maximumPoolSize);
if (divide > threadPoolExecutor.getThreadPoolAlarm().getLivenessAlarm()) { if (divide > threadPoolExecutor.getThreadPoolAlarm().getLivenessAlarm()) {
SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor);
} }
} }
/** /**
* checkPoolRejectAlarm. * Check thread pool reject policy alarm.
* *
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public static void checkPoolRejectAlarm(CustomThreadPoolExecutor threadPoolExecutor) { public static void checkPoolRejectAlarm(CustomThreadPoolExecutor threadPoolExecutor) {
SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor); SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor);
}
/**
* Send thread pool configuration change message.
*
* @param parameter
*/
public static void sendPoolConfigChange(PoolParameterInfo parameter) {
SEND_MESSAGE_SERVICE.sendChangeMessage(parameter);
} }
} }

@ -1,10 +1,11 @@
package com.github.dynamic.threadpool.starter.core; package com.github.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo; import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Objects; import java.util.Objects;
@ -22,60 +23,60 @@ public class ThreadPoolDynamicRefresh {
public static void refreshDynamicPool(String content) { public static void refreshDynamicPool(String content) {
PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class); PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class);
String tpId = parameter.getTpId(); ThreadPoolAlarmManage.sendPoolConfigChange(parameter);
Integer coreSize = parameter.getCoreSize(), maxSize = parameter.getMaxSize(), ThreadPoolDynamicRefresh.refreshDynamicPool(parameter);
queueType = parameter.getQueueType(), capacity = parameter.getCapacity(),
keepAliveTime = parameter.getKeepAliveTime(), rejectedType = parameter.getRejectedType();
refreshDynamicPool(tpId, coreSize, maxSize, queueType, capacity, keepAliveTime, rejectedType);
} }
public static void refreshDynamicPool(String threadPoolId, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime, Integer rejectedType) { public static void refreshDynamicPool(PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId();
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool();
int originalCoreSize = executor.getCorePoolSize(); int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize(); int originalMaximumPoolSize = executor.getMaximumPoolSize();
int originalQueryType = queueType; int originalQueryType = parameter.getQueueType();
int originalCapacity = executor.getQueue().remainingCapacity() + executor.getQueue().size(); int originalCapacity = executor.getQueue().remainingCapacity() + executor.getQueue().size();
long originalKeepAliveTime = executor.getKeepAliveTime(TimeUnit.MILLISECONDS); long originalKeepAliveTime = executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
int originalRejectedType = rejectedType; int originalRejectedType = parameter.getRejectedType();
changePoolInfo(executor, coreSize, maxSize, queueType, capacity, keepAliveTime, rejectedType); changePoolInfo(executor, parameter);
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool();
log.info("[🔥 {}] Changed thread pool. coreSize :: [{}], maxSize :: [{}], queueType :: [{}], capacity :: [{}], keepAliveTime :: [{}], rejectedType :: [{}]", log.info("[🔥 {}] Changed thread pool. coreSize :: [{}], maxSize :: [{}], queueType :: [{}], capacity :: [{}], keepAliveTime :: [{}], rejectedType :: [{}]",
threadPoolId.toUpperCase(), threadPoolId.toUpperCase(),
String.format("%s=>%s", originalCoreSize, afterExecutor.getCorePoolSize()), String.format("%s=>%s", originalCoreSize, afterExecutor.getCorePoolSize()),
String.format("%s=>%s", originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()), String.format("%s=>%s", originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()),
String.format("%s=>%s", originalQueryType, queueType), String.format("%s=>%s", originalQueryType, parameter.getQueueType()),
String.format("%s=>%s", originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())), String.format("%s=>%s", originalCapacity,
(afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())),
String.format("%s=>%s", originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS)), String.format("%s=>%s", originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS)),
String.format("%s=>%s", originalRejectedType, rejectedType)); String.format("%s=>%s", originalRejectedType, parameter.getRejectedType()));
} }
public static void changePoolInfo(ThreadPoolExecutor executor, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime, Integer rejectedType) { public static void changePoolInfo(ThreadPoolExecutor executor, PoolParameterInfo parameter) {
if (coreSize != null) { if (parameter.getCoreSize() != null) {
executor.setCorePoolSize(coreSize); executor.setCorePoolSize(parameter.getCoreSize());
} }
if (maxSize != null) { if (parameter.getMaxSize() != null) {
executor.setMaximumPoolSize(maxSize); executor.setMaximumPoolSize(parameter.getMaxSize());
} }
if (capacity != null && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type, queueType)) { if (parameter.getCapacity() != null
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type, parameter.getQueueType())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) { if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue(); ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(capacity); queue.setCapacity(parameter.getCapacity());
} else { } else {
log.warn("[Pool change] The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName()); log.warn("[Pool change] The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
} }
} }
if (keepAliveTime != null) { if (parameter.getKeepAliveTime() != null) {
executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS); executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
} }
if (rejectedType != null) { if (parameter.getRejectedType() != null) {
executor.setRejectedExecutionHandler(RejectedTypeEnum.createPolicy(queueType)); executor.setRejectedExecutionHandler(RejectedTypeEnum.createPolicy(parameter.getRejectedType()));
} }
} }

@ -3,6 +3,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread;
import com.github.dynamic.threadpool.starter.spi.DynamicTpServiceLoader; import com.github.dynamic.threadpool.starter.spi.DynamicTpServiceLoader;
import com.github.dynamic.threadpool.starter.spi.CustomBlockingQueue; import com.github.dynamic.threadpool.starter.spi.CustomBlockingQueue;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -19,49 +20,52 @@ public enum QueueTypeEnum {
/** /**
* {@link java.util.concurrent.ArrayBlockingQueue} * {@link java.util.concurrent.ArrayBlockingQueue}
*/ */
ARRAY_BLOCKING_QUEUE(1), ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"),
/** /**
* {@link java.util.concurrent.LinkedBlockingQueue} * {@link java.util.concurrent.LinkedBlockingQueue}
*/ */
LINKED_BLOCKING_QUEUE(2), LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"),
/** /**
* {@link java.util.concurrent.LinkedBlockingDeque} * {@link java.util.concurrent.LinkedBlockingDeque}
*/ */
LINKED_BLOCKING_DEQUE(3), LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque"),
/** /**
* {@link java.util.concurrent.SynchronousQueue} * {@link java.util.concurrent.SynchronousQueue}
*/ */
SYNCHRONOUS_QUEUE(4), SYNCHRONOUS_QUEUE(4, "SynchronousQueue"),
/** /**
* {@link java.util.concurrent.LinkedTransferQueue} * {@link java.util.concurrent.LinkedTransferQueue}
*/ */
LINKED_TRANSFER_QUEUE(5), LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue"),
/** /**
* {@link java.util.concurrent.PriorityBlockingQueue} * {@link java.util.concurrent.PriorityBlockingQueue}
*/ */
PRIORITY_BLOCKING_QUEUE(6), PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue"),
/** /**
* {@link "io.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue"} * {@link "io.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue"}
*/ */
RESIZABLE_LINKED_BLOCKING_QUEUE(9); RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockIngQueue");
public Integer type; public Integer type;
QueueTypeEnum(int type) { public String name;
QueueTypeEnum(int type, String name) {
this.type = type; this.type = type;
this.name = name;
} }
static { static {
DynamicTpServiceLoader.register(CustomBlockingQueue.class); DynamicTpServiceLoader.register(CustomBlockingQueue.class);
} }
public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) { public static BlockingQueue createBlockingQueue(int type, Integer capacity) {
BlockingQueue blockingQueue = null; BlockingQueue blockingQueue = null;
if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) { if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) {
blockingQueue = new ArrayBlockingQueue(capacity); blockingQueue = new ArrayBlockingQueue(capacity);
@ -90,4 +94,12 @@ public enum QueueTypeEnum {
return blockingQueue; return blockingQueue;
} }
public static String getBlockingQueueNameByType(int type) {
Optional<QueueTypeEnum> queueTypeEnum = Arrays.stream(QueueTypeEnum.values())
.filter(each -> each.type == type)
.findFirst();
return queueTypeEnum.map(each -> each.name).orElse("");
}
} }

@ -3,6 +3,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread;
import com.github.dynamic.threadpool.starter.spi.CustomRejectedExecutionHandler; import com.github.dynamic.threadpool.starter.spi.CustomRejectedExecutionHandler;
import com.github.dynamic.threadpool.starter.spi.DynamicTpServiceLoader; import com.github.dynamic.threadpool.starter.spi.DynamicTpServiceLoader;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -67,7 +68,7 @@ public enum RejectedTypeEnum {
DynamicTpServiceLoader.register(CustomRejectedExecutionHandler.class); DynamicTpServiceLoader.register(CustomRejectedExecutionHandler.class);
} }
public static RejectedExecutionHandler createPolicy(Integer type) { public static RejectedExecutionHandler createPolicy(int type) {
Optional<RejectedExecutionHandler> rejectedTypeEnum = Stream.of(RejectedTypeEnum.values()) Optional<RejectedExecutionHandler> rejectedTypeEnum = Stream.of(RejectedTypeEnum.values())
.filter(each -> Objects.equals(type, each.type)) .filter(each -> Objects.equals(type, each.type))
.map(each -> each.rejectedHandler) .map(each -> each.rejectedHandler)
@ -88,4 +89,11 @@ public enum RejectedTypeEnum {
return resultRejected; return resultRejected;
} }
public static String getRejectedNameByType(int type) {
Optional<RejectedTypeEnum> rejectedTypeEnum = Arrays.stream(RejectedTypeEnum.values())
.filter(each -> each.type == type).findFirst();
return rejectedTypeEnum.map(each -> each.rejectedHandler.getClass().getSimpleName()).orElse("");
}
} }

Loading…
Cancel
Save