Optimized thread pool notification alerts

pull/639/head
chen.ma 3 years ago
parent 9c1cef1c3b
commit 2a88d910ab

@ -48,7 +48,7 @@ import java.util.concurrent.*;
public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner { public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner {
@NonNull @NonNull
private final Hippo4jSendMessageService hippoSendMessageService; private final Hippo4jSendMessageService hippo4jSendMessageService;
@Value("${spring.profiles.active:UNKNOWN}") @Value("${spring.profiles.active:UNKNOWN}")
private String active; private String active;
@ -101,19 +101,19 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
if (hippoSendMessageService == null) { ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getCapacityAlarm() <= 0) {
return; return;
} }
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
BlockingQueue blockingQueue = threadPoolExecutor.getQueue(); BlockingQueue blockingQueue = threadPoolExecutor.getQueue();
int queueSize = blockingQueue.size(); int queueSize = blockingQueue.size();
int capacity = queueSize + blockingQueue.remainingCapacity(); int capacity = queueSize + blockingQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity); int divide = CalculateUtil.divide(queueSize, capacity);
boolean isSend = threadPoolNotifyAlarm.getAlarm() && divide > threadPoolNotifyAlarm.getCapacityAlarm(); boolean isSend = alarmConfig.getAlarm() && divide > alarmConfig.getCapacityAlarm();
if (isSend) { if (isSend) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId); alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.CAPACITY, alarmNotifyRequest); hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.CAPACITY, alarmNotifyRequest);
} }
} }
@ -124,15 +124,18 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getCapacityAlarm() <= 0) {
return;
}
int activeCount = threadPoolExecutor.getActiveCount(); int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize); int divide = CalculateUtil.divide(activeCount, maximumPoolSize);
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); boolean isSend = alarmConfig.getAlarm() && divide > alarmConfig.getActiveAlarm();
boolean isSend = threadPoolNotifyAlarm.getAlarm() && divide > threadPoolNotifyAlarm.getActiveAlarm();
if (isSend) { if (isSend) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId); alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.ACTIVITY, alarmNotifyRequest); hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.ACTIVITY, alarmNotifyRequest);
} }
} }
@ -143,15 +146,15 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
*/ */
public void asyncSendRejectedAlarm(String threadPoolId) { public void asyncSendRejectedAlarm(String threadPoolId) {
Runnable checkPoolRejectedAlarmTask = () -> { Runnable checkPoolRejectedAlarmTask = () -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getAlarm()) { if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm()) {
return; return;
} }
ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) { if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId); alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest); hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest);
} }
}; };
ASYNC_ALARM_NOTIFY_EXECUTOR.execute(checkPoolRejectedAlarmTask); ASYNC_ALARM_NOTIFY_EXECUTOR.execute(checkPoolRejectedAlarmTask);
@ -166,13 +169,13 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getAlarm()) { if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm()) {
return; return;
} }
if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) { if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) {
try { try {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId); alarmNotifyRequest.setThreadPoolId(threadPoolId);
alarmNotifyRequest.setExecuteTime(executeTime); alarmNotifyRequest.setExecuteTime(executeTime);
alarmNotifyRequest.setExecuteTimeOut(executeTimeOut); alarmNotifyRequest.setExecuteTimeOut(executeTimeOut);
@ -180,7 +183,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
if (StringUtil.isNotBlank(executeTimeoutTrace)) { if (StringUtil.isNotBlank(executeTimeoutTrace)) {
alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace); alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace);
} }
Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest); Runnable task = () -> hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
ASYNC_ALARM_NOTIFY_EXECUTOR.execute(task); ASYNC_ALARM_NOTIFY_EXECUTOR.execute(task);
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Send thread pool execution timeout alarm error.", ex); log.error("Send thread pool execution timeout alarm error.", ex);
@ -198,50 +201,40 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
String appName = StrUtil.isBlank(itemId) ? applicationName : itemId; String appName = StrUtil.isBlank(itemId) ? applicationName : itemId;
request.setAppName(appName); request.setAppName(appName);
request.setIdentify(IdentifyUtil.getIdentify()); request.setIdentify(IdentifyUtil.getIdentify());
hippoSendMessageService.sendChangeMessage(request); hippo4jSendMessageService.sendChangeMessage(request);
} }
/** /**
* Build alarm notify req. * Build alarm notify request.
* *
* @param threadPoolExecutor * @param threadPoolExecutor
* @return * @return
*/ */
public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecutor) { public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
AlarmNotifyRequest request = new AlarmNotifyRequest(); BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
String appName = StrUtil.isBlank(itemId) ? applicationName : itemId;
request.setAppName(appName);
int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int poolSize = threadPoolExecutor.getPoolSize();
int activeCount = threadPoolExecutor.getActiveCount();
int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
request.setActive(active.toUpperCase());
request.setIdentify(IdentifyUtil.getIdentify());
request.setCorePoolSize(corePoolSize);
request.setMaximumPoolSize(maximumPoolSize);
request.setPoolSize(poolSize);
request.setActiveCount(activeCount);
request.setLargestPoolSize(largestPoolSize);
request.setCompletedTaskCount(completedTaskCount);
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
int queueSize = queue.size();
String queueType = queue.getClass().getSimpleName();
int remainingCapacity = queue.remainingCapacity();
int queueCapacity = queueSize + remainingCapacity;
request.setQueueName(queueType);
request.setCapacity(queueCapacity);
request.setQueueSize(queueSize);
request.setRemainingCapacity(remainingCapacity);
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler() ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler(); : threadPoolExecutor.getRejectedExecutionHandler();
request.setRejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName());
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L; : -1L;
request.setRejectCountNum(rejectCount); AlarmNotifyRequest alarmNotifyRequest = AlarmNotifyRequest.builder()
return request; .appName(StrUtil.isBlank(itemId) ? applicationName : itemId)
.active(active.toUpperCase())
.identify(IdentifyUtil.getIdentify())
.corePoolSize(threadPoolExecutor.getCorePoolSize())
.maximumPoolSize(threadPoolExecutor.getMaximumPoolSize())
.poolSize(threadPoolExecutor.getPoolSize())
.activeCount(threadPoolExecutor.getActiveCount())
.largestPoolSize(threadPoolExecutor.getLargestPoolSize())
.completedTaskCount(threadPoolExecutor.getCompletedTaskCount())
.queueName(blockingQueue.getClass().getSimpleName())
.capacity(blockingQueue.size() + blockingQueue.remainingCapacity())
.queueSize(blockingQueue.size())
.remainingCapacity(blockingQueue.remainingCapacity())
.rejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName())
.rejectCountNum(rejectCount)
.build();
return alarmNotifyRequest;
} }
} }

@ -19,13 +19,19 @@ package cn.hippo4j.message.request;
import cn.hippo4j.message.enums.NotifyTypeEnum; import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.base.BaseNotifyRequest; import cn.hippo4j.message.request.base.BaseNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
/** /**
* Alarm notify request. * Alarm notify request.
*/ */
@Data @Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true) @Accessors(chain = true)
public class AlarmNotifyRequest extends BaseNotifyRequest { public class AlarmNotifyRequest extends BaseNotifyRequest {

@ -80,8 +80,8 @@ public class DynamicThreadPoolCoreAutoConfiguration {
} }
@Bean @Bean
public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) { public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService); return new ThreadPoolNotifyAlarmHandler(hippo4jSendMessageService);
} }
@Bean @Bean

@ -201,7 +201,7 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) { public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService); return new ThreadPoolNotifyAlarmHandler(hippo4jSendMessageService);
} }
} }

Loading…
Cancel
Save