Develop dynamic thread pool monitoring and alarm notification.

pull/161/head
chen.ma 4 years ago
parent 642461aa8d
commit 984a7d44f2

@ -25,6 +25,8 @@ public class InstanceInfo {
private String instanceId; private String instanceId;
private String ipApplicationName;
private volatile String vipAddress; private volatile String vipAddress;
private volatile String secureVipAddress; private volatile String secureVipAddress;

@ -60,6 +60,11 @@
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

@ -0,0 +1,29 @@
package com.github.dynamic.threadpool.starter.alarm;
import lombok.Data;
/**
* Alarm Config.
*
* @author chen.ma
* @date 2021/8/15 16:09
*/
@Data
public class AlarmConfig {
/**
* type
*/
private String type;
/**
* url
*/
private String url;
/**
* token
*/
private String token;
}

@ -0,0 +1,45 @@
package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.InitializingBean;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Base Send Message Service.
*
* @author chen.ma
* @date 2021/8/15 15:34
*/
@RequiredArgsConstructor
public class BaseSendMessageService implements InitializingBean, SendMessageService {
@NonNull
private final List<AlarmConfig> alarmConfigs;
private final List<SendMessageHandler> sendMessageHandlers = new ArrayList(4);
@Override
public void sendMessage(CustomThreadPoolExecutor threadPoolExecutor) {
for (SendMessageHandler messageHandler : sendMessageHandlers) {
try {
messageHandler.sendMessage(alarmConfigs, threadPoolExecutor);
} catch (Exception ex) {
// ignore
}
}
}
@Override
public void afterPropertiesSet() {
Map<String, SendMessageHandler> sendMessageHandlerMap =
ApplicationContextHolder.getBeansOfType(SendMessageHandler.class);
sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.add(each));
}
}

@ -0,0 +1,133 @@
package com.github.dynamic.threadpool.starter.alarm;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.taobao.api.ApiException;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
/**
* Ding Send.
*
* @author chen.ma
* @date 2021/8/15 15:49
*/
@Slf4j
@RequiredArgsConstructor
public class DingSendMessageHandler implements SendMessageHandler {
@NonNull
private String active;
@NonNull
private InstanceInfo instanceInfo;
@Override
public String getType() {
return SendMessageEnum.DING.name();
}
@Override
public void sendMessage(List<AlarmConfig> alarmConfigs, CustomThreadPoolExecutor pool) {
Optional<AlarmConfig> alarmConfigOptional = alarmConfigs.stream()
.filter(each -> Objects.equals(each.getType(), getType()))
.findFirst();
alarmConfigOptional.ifPresent(each -> dingSendMessage(each, pool));
}
public void dingSendMessage(AlarmConfig alarmConfig, CustomThreadPoolExecutor pool) {
String serverUrl = alarmConfig.getUrl() + alarmConfig.getToken();
BlockingQueue<Runnable> queue = pool.getQueue();
String text = String.format(
"<font color='#FF0000'>[警报] </font>%s - 动态线程池运行告警 \n\n" +
" --- \n\n " +
"<font color='#778899' size=2>应用实例:%s</font> \n\n " +
"<font color='#708090' size=2>线程池名称:%s</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>核心线程数:%d</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%d</font> \n\n " +
"<font color='#708090' size=2>当前线程数:%d</font> \n\n " +
"<font color='#708090' size=2>活跃线程数:%d</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%d</font> \n\n " +
"<font color='#708090' size=2>线程池任务总量:%d</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>队列类型:%s</font> \n\n " +
"<font color='#708090' size=2>队列总容量:%d</font> \n\n " +
"<font color='#708090' size=2>队列元素个数:%d</font> \n\n " +
"<font color='#708090' size=2>队列剩余个数:%d</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>拒绝策略次数:</font><font color='#FF0000' size=2>%d</font> \n\n " +
"<font color='#708090' size=2>OWNER@%s</font> \n\n" +
"<font color='#708090' size=2>提示5 分钟内此线程池不会重复告警(可配置)</font> \n\n" +
" --- \n\n " +
"**播报时间:%s**",
// 环境
active.toUpperCase(),
// 节点信息
instanceInfo.getIpApplicationName(),
// 线程池ID
pool.getThreadPoolId(),
// 核心线程数
pool.getCorePoolSize(),
// 最大线程数
pool.getMaximumPoolSize(),
// 当前线程数
pool.getPoolSize(),
// 活跃线程数
pool.getActiveCount(),
// 最大任务数
pool.getLargestPoolSize(),
// 线程池任务总量
pool.getCompletedTaskCount(),
// 队列类型名称
queue.getClass().getSimpleName(),
// 队列容量
queue.size() + queue.remainingCapacity(),
// 队列元素个数
queue.size(),
// 队列剩余个数
queue.remainingCapacity(),
// 拒绝策略次数
pool.getRejectCount(),
// 告警手机号
"15601166691",
// 当前时间
DateUtil.now()
);
DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl);
OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown");
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
markdown.setTitle("动态线程池告警");
markdown.setText(text);
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setAtMobiles(CollUtil.newArrayList("15601166691"));
request.setAt(at);
request.setMarkdown(markdown);
try {
dingTalkClient.execute(request);
} catch (ApiException ex) {
log.error("Ding failed to send message", ex.getMessage());
}
}
}

@ -0,0 +1,13 @@
package com.github.dynamic.threadpool.starter.alarm;
/**
* Send Message Enum.
*
* @author chen.ma
* @date 2021/8/15 15:50
*/
public enum SendMessageEnum {
DING
}

@ -0,0 +1,30 @@
package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import java.util.List;
/**
* Send Message Handler.
*
* @author chen.ma
* @date 2021/8/15 15:44
*/
public interface SendMessageHandler {
/**
* getType.
*
* @return
*/
String getType();
/**
* sendMessage.
*
* @param alarmConfigs
* @param threadPoolExecutor
*/
void sendMessage(List<AlarmConfig> alarmConfigs, CustomThreadPoolExecutor threadPoolExecutor);
}

@ -0,0 +1,20 @@
package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
/**
* Send Msg.
*
* @author chen.ma
* @date 2021/8/15 15:31
*/
public interface SendMessageService {
/**
* sendMessage.
*
* @param threadPoolExecutor
*/
void sendMessage(CustomThreadPoolExecutor threadPoolExecutor);
}

@ -0,0 +1,31 @@
package com.github.dynamic.threadpool.starter.alarm;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* ThreadPool Alarm.
*
* @author chen.ma
* @date 2021/8/15 13:13
*/
@Data
@AllArgsConstructor
public class ThreadPoolAlarm {
/**
*
*/
private Boolean isAlarm;
/**
*
*/
private Integer livenessAlarm;
/**
*
*/
private Integer capacityAlarm;
}

@ -0,0 +1,69 @@
package com.github.dynamic.threadpool.starter.alarm;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.starter.toolkit.CalculateUtil;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue;
import lombok.extern.slf4j.Slf4j;
/**
* Alarm Manage.
*
* @author chen.ma
* @date 2021/8/15 14:13
*/
@Slf4j
public class ThreadPoolAlarmManage {
private static final SendMessageService SEND_MESSAGE_SERVICE;
static {
SEND_MESSAGE_SERVICE = ApplicationContextHolder.getBean("sendMessageService", SendMessageService.class);
}
/**
* checkPoolCapacityAlarm.
*
* @param threadPoolExecutor
*/
public static void checkPoolCapacityAlarm(CustomThreadPoolExecutor threadPoolExecutor) {
ThreadPoolAlarm threadPoolAlarm = threadPoolExecutor.getThreadPoolAlarm();
ResizableCapacityLinkedBlockIngQueue blockIngQueue =
(ResizableCapacityLinkedBlockIngQueue) threadPoolExecutor.getQueue();
int queueSize = blockIngQueue.size();
int capacity = queueSize + blockIngQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity);
if (divide > threadPoolAlarm.getCapacityAlarm()) {
SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor);
}
}
/**
* checkPoolLivenessAlarm.
*
* @param isCore
* @param threadPoolExecutor
*/
public static void checkPoolLivenessAlarm(boolean isCore, CustomThreadPoolExecutor threadPoolExecutor) {
if (isCore) {
return;
}
int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize);
if (divide > threadPoolExecutor.getThreadPoolAlarm().getLivenessAlarm()) {
SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor);
}
}
/**
* checkPoolRejectAlarm.
*
* @param threadPoolExecutor
*/
public static void checkPoolRejectAlarm(CustomThreadPoolExecutor threadPoolExecutor) {
SEND_MESSAGE_SERVICE.sendMessage(threadPoolExecutor);
}
}

@ -1,10 +1,10 @@
package com.github.dynamic.threadpool.starter.common; package com.github.dynamic.threadpool.starter.common;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedPolicies; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedPolicies;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -15,13 +15,15 @@ import java.util.concurrent.TimeUnit;
*/ */
public class CommonThreadPool { public class CommonThreadPool {
public static ThreadPoolExecutor getInstance(String threadPoolId) { public static CustomThreadPoolExecutor getInstance(String threadPoolId) {
ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.builder() CustomThreadPoolExecutor poolExecutor = (CustomThreadPoolExecutor) ThreadPoolBuilder.builder()
.isCustomPool(true) .isCustomPool(true)
.threadPoolId(threadPoolId)
.threadFactory(threadPoolId) .threadFactory(threadPoolId)
.poolThreadSize(3, 5) .poolThreadSize(3, 5)
.keepAliveTime(1000L, TimeUnit.SECONDS) .keepAliveTime(1000L, TimeUnit.SECONDS)
.rejected(RejectedPolicies.runsOldestTaskPolicy()) .rejected(RejectedPolicies.runsOldestTaskPolicy())
.alarmConfig(1, 80, 80)
.workQueue(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE, 512) .workQueue(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE, 512)
.build(); .build();
return poolExecutor; return poolExecutor;

@ -1,10 +1,13 @@
package com.github.dynamic.threadpool.starter.config; package com.github.dynamic.threadpool.starter.config;
import com.github.dynamic.threadpool.starter.alarm.AlarmConfig;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
/** /**
* Bootstrap Properties. * Bootstrap Properties.
* *
@ -39,4 +42,9 @@ public class BootstrapProperties {
*/ */
private boolean banner = true; private boolean banner = true;
/**
* alarms
*/
private List<AlarmConfig> alarms;
} }

@ -11,6 +11,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.net.InetAddress; import java.net.InetAddress;
import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId; import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId;
import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getIpApplicationName;
/** /**
* Dynamic Tp Discovery Config. * Dynamic Tp Discovery Config.
@ -28,6 +29,7 @@ public class DiscoveryConfig {
public InstanceInfo instanceConfig() { public InstanceInfo instanceConfig() {
InstanceInfo instanceInfo = new InstanceInfo(); InstanceInfo instanceInfo = new InstanceInfo();
instanceInfo.setInstanceId(getDefaultInstanceId(environment)); instanceInfo.setInstanceId(getDefaultInstanceId(environment));
instanceInfo.setIpApplicationName(getIpApplicationName(environment));
instanceInfo.setAppName(environment.getProperty("spring.application.name")); instanceInfo.setAppName(environment.getProperty("spring.application.name"));
instanceInfo.setHostName(InetAddress.getLocalHost().getHostAddress()); instanceInfo.setHostName(InetAddress.getLocalHost().getHostAddress());

@ -15,6 +15,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
/** /**
* DynamicTp Auto Configuration. * DynamicTp Auto Configuration.
@ -24,9 +26,9 @@ import org.springframework.context.annotation.Configuration;
*/ */
@Configuration @Configuration
@AllArgsConstructor @AllArgsConstructor
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnBean(MarkerConfiguration.Marker.class) @ConditionalOnBean(MarkerConfiguration.Marker.class)
@ImportAutoConfiguration({HttpClientConfig.class, DiscoveryConfig.class}) @EnableConfigurationProperties(BootstrapProperties.class)
@ImportAutoConfiguration({HttpClientConfig.class, DiscoveryConfig.class, MessageAlarmConfig.class})
public class DynamicThreadPoolAutoConfiguration { public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties; private final BootstrapProperties properties;
@ -37,6 +39,7 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder applicationContextHolder() { public ApplicationContextHolder applicationContextHolder() {
return new ApplicationContextHolder(); return new ApplicationContextHolder();
} }

@ -0,0 +1,41 @@
package com.github.dynamic.threadpool.starter.config;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.starter.alarm.BaseSendMessageService;
import com.github.dynamic.threadpool.starter.alarm.DingSendMessageHandler;
import com.github.dynamic.threadpool.starter.alarm.SendMessageHandler;
import com.github.dynamic.threadpool.starter.alarm.SendMessageService;
import lombok.AllArgsConstructor;
import org.apache.logging.log4j.util.Strings;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.ConfigurableEnvironment;
/**
* Message Alarm Config.
*
* @author chen.ma
* @date 2021/8/15 15:39
*/
@AllArgsConstructor
public class MessageAlarmConfig {
private final BootstrapProperties properties;
private final InstanceInfo instanceInfo;
private ConfigurableEnvironment environment;
@Bean
@DependsOn("applicationContextHolder")
public SendMessageService sendMessageService() {
return new BaseSendMessageService(properties.getAlarms());
}
@Bean
public SendMessageHandler dingSendMessageHandler() {
String active = environment.getProperty("spring.profiles.active", Strings.EMPTY);
return new DingSendMessageHandler(active, instanceInfo);
}
}

@ -7,6 +7,7 @@ import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.common.CommonThreadPool;
import com.github.dynamic.threadpool.starter.config.BootstrapProperties; import com.github.dynamic.threadpool.starter.config.BootstrapProperties;
import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
@ -81,7 +82,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
queryStrMap.put("namespace", properties.getNamespace()); queryStrMap.put("namespace", properties.getNamespace());
PoolParameterInfo ppi = new PoolParameterInfo(); PoolParameterInfo ppi = new PoolParameterInfo();
ThreadPoolExecutor poolExecutor = null; CustomThreadPoolExecutor poolExecutor = null;
Result result = null; Result result = null;
try { try {
@ -89,13 +90,15 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池 // 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
poolExecutor = ThreadPoolBuilder.builder() poolExecutor = (CustomThreadPoolExecutor) ThreadPoolBuilder.builder()
.isCustomPool(true) .isCustomPool(true)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS)
.workQueue(workQueue) .workQueue(workQueue)
.threadPoolId(tpId)
.threadFactory(tpId) .threadFactory(tpId)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) .rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType()))
.alarmConfig(ppi.getIsAlarm(), ppi.getCapacityAlarm(), ppi.getLivenessAlarm())
.build(); .build();
dynamicThreadPoolWrap.setPool(poolExecutor); dynamicThreadPoolWrap.setPool(poolExecutor);

@ -17,12 +17,17 @@ public class CloudCommonIdUtil {
@SneakyThrows @SneakyThrows
public static String getDefaultInstanceId(PropertyResolver resolver) { public static String getDefaultInstanceId(PropertyResolver resolver) {
String namePart = getIpApplicationName(resolver);
String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port"));
return combineParts(namePart, SEPARATOR, indexPart);
}
@SneakyThrows
public static String getIpApplicationName(PropertyResolver resolver) {
InetAddress host = InetAddress.getLocalHost(); InetAddress host = InetAddress.getLocalHost();
String hostname = host.getHostAddress(); String hostname = host.getHostAddress();
String appName = resolver.getProperty("spring.application.name"); String appName = resolver.getProperty("spring.application.name");
String namePart = combineParts(hostname, SEPARATOR, appName); return combineParts(hostname, SEPARATOR, appName);
String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port"));
return combineParts(namePart, SEPARATOR, indexPart);
} }
public static String combineParts(String firstPart, String separator, public static String combineParts(String firstPart, String separator,

@ -1,6 +1,7 @@
package com.github.dynamic.threadpool.starter.toolkit.thread; package com.github.dynamic.threadpool.starter.toolkit.thread;
import com.github.dynamic.threadpool.common.toolkit.Assert; import com.github.dynamic.threadpool.common.toolkit.Assert;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -100,9 +101,10 @@ public class AbstractBuildThreadPoolTemplate {
initParam.getKeepAliveTime(), initParam.getKeepAliveTime(),
initParam.getTimeUnit(), initParam.getTimeUnit(),
initParam.getWorkQueue(), initParam.getWorkQueue(),
initParam.getThreadPoolId(),
initParam.getThreadFactory(), initParam.getThreadFactory(),
initParam.getThreadPoolAlarm(),
initParam.getRejectedExecutionHandler()); initParam.getRejectedExecutionHandler());
return executorService; return executorService;
} }
@ -150,7 +152,18 @@ public class AbstractBuildThreadPoolTemplate {
*/ */
private ThreadFactory threadFactory; private ThreadFactory threadFactory;
/**
* 线 ID
*/
private String threadPoolId;
/**
*
*/
private ThreadPoolAlarm threadPoolAlarm;
public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) {
this.threadPoolId = threadNamePrefix;
this.threadFactory = ThreadFactoryBuilder.builder() this.threadFactory = ThreadFactoryBuilder.builder()
.prefix(threadNamePrefix) .prefix(threadNamePrefix)
.daemon(isDaemon) .daemon(isDaemon)

@ -1,5 +1,7 @@
package com.github.dynamic.threadpool.starter.toolkit.thread; package com.github.dynamic.threadpool.starter.toolkit.thread;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
@ -46,47 +48,24 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
private volatile boolean allowCoreThreadTimeOut; private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize; private volatile int corePoolSize;
private volatile int maximumPoolSize; private volatile int maximumPoolSize;
private String threadPoolId;
private final AccessControlContext acc; private final AccessControlContext acc;
private volatile ThreadPoolAlarm threadPoolAlarm;
private volatile ThreadFactory threadFactory; private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler; private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy(); private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy();
private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread"); private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread");
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), DEFAULT_HANDLER);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_HANDLER);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public CustomThreadPoolExecutor(int corePoolSize, public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, int maximumPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue, @NonNull BlockingQueue<Runnable> workQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory, @NonNull ThreadFactory threadFactory,
@NonNull ThreadPoolAlarm threadPoolAlarm,
@NonNull RejectedExecutionHandler handler) { @NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
@ -100,9 +79,11 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
this.corePoolSize = corePoolSize; this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize; this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue; this.workQueue = workQueue;
this.threadPoolId = threadPoolId;
this.keepAliveTime = unit.toNanos(keepAliveTime); this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory; this.threadFactory = threadFactory;
this.handler = handler; this.handler = handler;
this.threadPoolAlarm = threadPoolAlarm;
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
} }
@ -147,6 +128,18 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
return rejectCount.get(); return rejectCount.get();
} }
public ThreadPoolAlarm getThreadPoolAlarm() {
return this.threadPoolAlarm;
}
public void setThreadPoolAlarm(ThreadPoolAlarm threadPoolAlarm) {
this.threadPoolAlarm = threadPoolAlarm;
}
public String getThreadPoolId() {
return this.threadPoolId;
}
private final class Worker private final class Worker
extends AbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
implements Runnable { implements Runnable {
@ -316,6 +309,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
final void reject(Runnable command) { final void reject(Runnable command) {
rejectCount.incrementAndGet(); rejectCount.incrementAndGet();
ThreadPoolAlarmManage.checkPoolRejectAlarm(this);
handler.rejectedExecution(command, this); handler.rejectedExecution(command, this);
} }
@ -368,6 +362,8 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
} }
} }
ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this);
boolean workerStarted = false; boolean workerStarted = false;
boolean workerAdded = false; boolean workerAdded = false;
Worker w = null; Worker w = null;
@ -542,6 +538,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
c = ctl.get(); c = ctl.get();
} }
if (isRunning(c) && workQueue.offer(command)) { if (isRunning(c) && workQueue.offer(command)) {
ThreadPoolAlarmManage.checkPoolCapacityAlarm(this);
int recheck = ctl.get(); int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) { if (!isRunning(recheck) && remove(command)) {
reject(command); reject(command);

@ -2,6 +2,7 @@ package com.github.dynamic.threadpool.starter.toolkit.thread;
import com.github.dynamic.threadpool.common.toolkit.Assert; import com.github.dynamic.threadpool.common.toolkit.Assert;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -74,6 +75,26 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/ */
private String threadNamePrefix; private String threadNamePrefix;
/**
* 线 ID
*/
private String threadPoolId;
/**
*
*/
private boolean isAlarm = false;
/**
*
*/
private Integer capacityAlarm;
/**
*
*/
private Integer livenessAlarm;
/** /**
* CPU / (1 - 0.8) * CPU / (1 - 0.8)
* *
@ -163,6 +184,18 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder threadPoolId(String threadPoolId) {
this.threadPoolId = threadPoolId;
return this;
}
public ThreadPoolBuilder alarmConfig(int isAlarm, int capacityAlarm, int livenessAlarm) {
this.isAlarm = isAlarm == 1 ? true : false;
this.capacityAlarm = capacityAlarm;
this.livenessAlarm = livenessAlarm;
return this;
}
/** /**
* *
* *
@ -222,7 +255,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
* @return * @return
*/ */
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
Assert.notEmpty(builder.threadNamePrefix, "线程名称前缀不可为空或空的字符串."); Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string.");
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam =
new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon);
@ -233,7 +266,12 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
.setRejectedExecutionHandler(builder.rejectedExecutionHandler) .setRejectedExecutionHandler(builder.rejectedExecutionHandler)
.setTimeUnit(builder.timeUnit); .setTimeUnit(builder.timeUnit);
// 快速消费线程池内置指定线程池 if (builder.isCustomPool) {
initParam.setThreadPoolId(builder.threadPoolId);
ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(builder.isAlarm, builder.capacityAlarm, builder.livenessAlarm);
initParam.setThreadPoolAlarm(threadPoolAlarm);
}
if (!builder.isFastPool) { if (!builder.isFastPool) {
if (builder.queueType != null) { if (builder.queueType != null) {
builder.workQueue = QueueTypeEnum.createBlockingQueue(builder.queueType.type, builder.capacity); builder.workQueue = QueueTypeEnum.createBlockingQueue(builder.queueType.type, builder.capacity);

@ -1,11 +1,11 @@
package com.github.dynamic.threadpool.starter.wrap; package com.github.dynamic.threadpool.starter.wrap;
import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.common.CommonThreadPool;
import com.github.dynamic.threadpool.starter.toolkit.thread.CustomThreadPoolExecutor;
import lombok.Data; import lombok.Data;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Dynamic ThreadPool Wrap. * Dynamic ThreadPool Wrap.
@ -22,7 +22,7 @@ public class DynamicThreadPoolWrap {
private String tpId; private String tpId;
private ThreadPoolExecutor pool; private CustomThreadPoolExecutor pool;
/** /**
* 线, 使线 {@link CommonThreadPool#getInstance(String)} * 线, 使线 {@link CommonThreadPool#getInstance(String)}
@ -39,7 +39,7 @@ public class DynamicThreadPoolWrap {
* @param threadPoolId * @param threadPoolId
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public DynamicThreadPoolWrap(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { public DynamicThreadPoolWrap(String threadPoolId, CustomThreadPoolExecutor threadPoolExecutor) {
this.tpId = threadPoolId; this.tpId = threadPoolId;
this.pool = threadPoolExecutor; this.pool = threadPoolExecutor;
} }

@ -33,6 +33,8 @@
<fastjson.version>1.2.75</fastjson.version> <fastjson.version>1.2.75</fastjson.version>
<commons-lang3.version>3.12.0</commons-lang3.version> <commons-lang3.version>3.12.0</commons-lang3.version>
<dingtalk-sdk.version>1.0.1</dingtalk-sdk.version>
<mybatis-plus.version>3.4.2</mybatis-plus.version> <mybatis-plus.version>3.4.2</mybatis-plus.version>
<spring-boot.version>2.3.2.RELEASE</spring-boot.version> <spring-boot.version>2.3.2.RELEASE</spring-boot.version>
</properties> </properties>
@ -119,6 +121,12 @@
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>${dingtalk-sdk.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

Loading…
Cancel
Save