Send dynamic thread pool rejection policy alarm asynchronously (#573)

pull/592/head
chen.ma 2 years ago
parent 0bcb90e102
commit 54fb0762c7

@ -66,7 +66,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
1, 1,
r -> new Thread(r, "client.alarm.notify")); r -> new Thread(r, "client.alarm.notify"));
private final ExecutorService EXECUTE_TIMEOUT_EXECUTOR = ThreadPoolBuilder.builder() private final ExecutorService ASYNC_ALARM_NOTIFY_EXECUTOR = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4) .poolThreadSize(2, 4)
.threadFactory("client.execute.timeout.alarm") .threadFactory("client.execute.timeout.alarm")
.allowCoreThreadTimeOut(true) .allowCoreThreadTimeOut(true)
@ -139,31 +139,24 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
} }
/** /**
* Check pool rejected alarm. * Async send rejected alarm.
* *
* @param threadPoolId * @param threadPoolId
*/ */
public void checkPoolRejectedAlarm(String threadPoolId) { public void asyncSendRejectedAlarm(String threadPoolId) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); Runnable checkPoolRejectedAlarmTask = () -> {
if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getAlarm()) { ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
return; if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getAlarm()) {
} return;
ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); }
checkPoolRejectedAlarm(threadPoolId, threadPoolExecutor); ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
} if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor);
/** alarmNotifyRequest.setThreadPoolId(threadPoolId);
* Check pool rejected alarm. hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest);
* }
* @param threadPoolId };
* @param threadPoolExecutor ASYNC_ALARM_NOTIFY_EXECUTOR.execute(checkPoolRejectedAlarmTask);
*/
public void checkPoolRejectedAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest);
}
} }
/** /**
@ -190,7 +183,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace); alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace);
} }
Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest); Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
EXECUTE_TIMEOUT_EXECUTOR.execute(task); ASYNC_ALARM_NOTIFY_EXECUTOR.execute(task);
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Send thread pool execution timeout alarm error.", ex); log.error("Send thread pool execution timeout alarm error.", ex);
} }

@ -20,6 +20,7 @@ package cn.hippo4j.core.proxy;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
/** /**
* Rejected proxy invocation handler. * Rejected proxy invocation handler.
*/ */
@Slf4j
@AllArgsConstructor @AllArgsConstructor
public class RejectedProxyInvocationHandler implements InvocationHandler { public class RejectedProxyInvocationHandler implements InvocationHandler {
@ -42,8 +44,12 @@ public class RejectedProxyInvocationHandler implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
rejectCount.incrementAndGet(); rejectCount.incrementAndGet();
if (ApplicationContextHolder.getInstance() != null) { if (ApplicationContextHolder.getInstance() != null) {
ThreadPoolNotifyAlarmHandler alarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class); try {
alarmHandler.checkPoolRejectedAlarm(threadPoolId); ThreadPoolNotifyAlarmHandler alarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
alarmHandler.asyncSendRejectedAlarm(threadPoolId);
} catch (Throwable ex) {
log.error("Failed to send rejection policy alert.", ex);
}
} }
try { try {
return method.invoke(target, args); return method.invoke(target, args);

Loading…
Cancel
Save