任务执行超时添加 Trace 信息.

pull/131/head
chen.ma 3 years ago
parent b36e17920e
commit 5279d6209c

@ -82,4 +82,6 @@ public class Constants {
public static final String UNKNOWN = "unknown";
public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace";
}

@ -1,32 +0,0 @@
package cn.hippo4j.common.notify;
/**
* Task trace decorator.
*
* @author chen.ma
* @date 2022/3/2 19:45
*/
public interface TaskTraceBuilder {
/**
* Before.
*/
default void before() {
}
/**
* Trace build.
*
* @return
*/
String traceBuild();
/**
* Clear.
*/
default void clear() {
}
}

@ -26,7 +26,7 @@ public class DingAlarmConstants {
/**
* Trace
*/
public static final String DING_ALARM_TIMOUT_TRACE_REPLACE_TXT = "<font color='#708090' size=2>链路信息:%d</font> \n\n";
public static final String DING_ALARM_TIMOUT_TRACE_REPLACE_TXT = "<font color='#708090' size=2>链路信息:%s</font> \n\n";
/**
*

@ -3,6 +3,7 @@ package cn.hippo4j.common.notify.platform;
import cn.hippo4j.common.notify.*;
import cn.hippo4j.common.notify.request.AlarmNotifyRequest;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.dingtalk.api.DefaultDingTalkClient;
@ -40,18 +41,10 @@ public class DingSendMessageHandler implements SendMessageHandler<AlarmNotifyReq
String dingAlarmTxt;
String dingAlarmTimoutReplaceTxt;
if (Objects.equals(notifyConfig.getTypeEnum(), NotifyTypeEnum.TIMEOUT)) {
TaskTraceBuilder taskTraceBuilder = alarmNotifyRequest.getTaskTraceBuilder();
if (taskTraceBuilder != null) {
String taskTraceStr = "";
try {
taskTraceStr = taskTraceBuilder.traceBuild();
} catch (Exception ex) {
// ignore
} finally {
taskTraceBuilder.clear();
}
String weChatAlarmTimoutTraceReplaceTxt = String.format(DING_ALARM_TIMOUT_TRACE_REPLACE_TXT, taskTraceStr);
dingAlarmTimoutReplaceTxt = StrUtil.replace(DING_ALARM_TIMOUT_REPLACE_TXT, DING_ALARM_TIMOUT_TRACE_REPLACE_TXT, weChatAlarmTimoutTraceReplaceTxt);
String executeTimeoutTrace = alarmNotifyRequest.getExecuteTimeoutTrace();
if (StringUtil.isNotBlank(executeTimeoutTrace)) {
String dingAlarmTimoutTraceReplaceTxt = String.format(DING_ALARM_TIMOUT_TRACE_REPLACE_TXT, executeTimeoutTrace);
dingAlarmTimoutReplaceTxt = StrUtil.replace(DING_ALARM_TIMOUT_REPLACE_TXT, DING_ALARM_TIMOUT_TRACE_REPLACE_TXT, dingAlarmTimoutTraceReplaceTxt);
} else {
dingAlarmTimoutReplaceTxt = StrUtil.replace(DING_ALARM_TIMOUT_REPLACE_TXT, DING_ALARM_TIMOUT_TRACE_REPLACE_TXT, "");
}

@ -4,6 +4,7 @@ import cn.hippo4j.common.notify.*;
import cn.hippo4j.common.notify.request.AlarmNotifyRequest;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
@ -38,17 +39,9 @@ public class WeChatSendMessageHandler implements SendMessageHandler<AlarmNotifyR
String weChatAlarmTxt;
String weChatAlarmTimoutReplaceTxt;
if (Objects.equals(notifyConfig.getTypeEnum(), NotifyTypeEnum.TIMEOUT)) {
TaskTraceBuilder taskTraceBuilder = alarmNotifyRequest.getTaskTraceBuilder();
if (taskTraceBuilder != null) {
String taskTraceStr = "";
try {
taskTraceStr = taskTraceBuilder.traceBuild();
} catch (Exception ex) {
// ignore
} finally {
taskTraceBuilder.clear();
}
String weChatAlarmTimoutTraceReplaceTxt = String.format(WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT, taskTraceStr);
String executeTimeoutTrace = alarmNotifyRequest.getExecuteTimeoutTrace();
if (StringUtil.isNotBlank(executeTimeoutTrace)) {
String weChatAlarmTimoutTraceReplaceTxt = String.format(WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT, executeTimeoutTrace);
weChatAlarmTimoutReplaceTxt = StrUtil.replace(WE_CHAT_ALARM_TIMOUT_REPLACE_TXT, WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT, weChatAlarmTimoutTraceReplaceTxt);
} else {
weChatAlarmTimoutReplaceTxt = StrUtil.replace(WE_CHAT_ALARM_TIMOUT_REPLACE_TXT, WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT, "");

@ -1,7 +1,6 @@
package cn.hippo4j.common.notify.request;
import cn.hippo4j.common.notify.NotifyTypeEnum;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.common.notify.request.base.BaseNotifyRequest;
import lombok.Data;
import lombok.experimental.Accessors;
@ -112,8 +111,8 @@ public class AlarmNotifyRequest extends BaseNotifyRequest {
private Long executeTimeOut;
/**
* taskTraceBuilder
* executeTimeoutTrace
*/
private TaskTraceBuilder taskTraceBuilder;
private String executeTimeoutTrace;
}

@ -0,0 +1,25 @@
package cn.hippo4j.common.toolkit;
import org.slf4j.MDC;
/**
* MD util.
*
* @author chen.ma
* @date 2022/3/3 08:30
*/
public class MDCUtil {
/**
* Get and remove.
*
* @param key
* @return
*/
public static String getAndRemove(String key) {
String val = MDC.get(key);
MDC.remove(key);
return val;
}
}

@ -2,7 +2,6 @@ package cn.hippo4j.core.executor;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import lombok.Getter;
import lombok.NonNull;
@ -28,10 +27,6 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@Setter
private TaskDecorator taskDecorator;
@Getter
@Setter
private TaskTraceBuilder taskTraceBuilder;
@Getter
@Setter
private RejectedExecutionHandler redundancyHandler;
@ -83,6 +78,10 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (executeTimeOut == null || executeTimeOut <= 0) {
return;
}
try {
long startTime = this.startTime.get();
long endTime = System.currentTimeMillis();

@ -5,6 +5,8 @@ import cn.hippo4j.common.notify.NotifyTypeEnum;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.AlarmNotifyRequest;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.MDCUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
@ -20,6 +22,8 @@ import org.springframework.boot.CommandLineRunner;
import java.util.List;
import java.util.concurrent.*;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/**
* Thread pool alarm notify.
*
@ -164,8 +168,15 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
alarmNotifyRequest.setThreadPoolId(threadPoolId);
alarmNotifyRequest.setExecuteTime(executeTime);
alarmNotifyRequest.setExecuteTimeOut(executeTimeOut);
String executeTimeoutTrace = MDCUtil.getAndRemove(EXECUTE_TIMEOUT_TRACE);
Runnable task = () -> {
if (StringUtil.isNotBlank(executeTimeoutTrace)) {
alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace);
}
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
};
Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
EXECUTE_TIMEOUT_EXECUTOR.execute(task);
} catch (Throwable ex) {
log.error("Send thread pool execution timeout alarm error.", ex);

@ -1,6 +1,5 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import lombok.Data;
@ -129,7 +128,6 @@ public class AbstractBuildThreadPoolTemplate {
}
dynamicThreadPoolExecutor.setTaskDecorator(initParam.getTaskDecorator());
dynamicThreadPoolExecutor.setTaskTraceBuilder(initParam.getTaskTraceBuilder());
dynamicThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut);
return dynamicThreadPoolExecutor;
}
@ -193,11 +191,6 @@ public class AbstractBuildThreadPoolTemplate {
*/
private TaskDecorator taskDecorator;
/**
* Trace
*/
private TaskTraceBuilder taskTraceBuilder;
/**
*
*/

@ -1,7 +1,6 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator;
@ -92,11 +91,6 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/
private TaskDecorator taskDecorator;
/**
* Trace
*/
private TaskTraceBuilder taskTraceBuilder;
/**
*
*/
@ -223,11 +217,6 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this;
}
public ThreadPoolBuilder taskTraceBuilder(TaskTraceBuilder taskTraceBuilder) {
this.taskTraceBuilder = taskTraceBuilder;
return this;
}
public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
return this;
@ -316,7 +305,6 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
.setMaxPoolNum(builder.maxPoolSize)
.setKeepAliveTime(builder.keepAliveTime)
.setCapacity(builder.capacity)
.setTaskTraceBuilder(builder.taskTraceBuilder)
.setExecuteTimeOut(builder.executeTimeOut)
.setRejectedExecutionHandler(builder.rejectedExecutionHandler)
.setTimeUnit(builder.timeUnit)

@ -34,8 +34,8 @@ public class ThreadPoolConfig {
public DynamicThreadPoolWrapper messageCenterDynamicThreadPool() {
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.taskDecorator(new TaskTraceBuilderHandler())
.threadFactory(MESSAGE_CONSUME)
.taskTraceBuilder(new TaskTraceBuilderHandler())
.build();
return new DynamicThreadPoolWrapper(MESSAGE_CONSUME, customExecutor);

@ -1,7 +1,10 @@
package cn.hippo4j.example.core.handler;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.common.toolkit.StringUtil;
import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/**
* Task trace builder handler.
@ -9,30 +12,21 @@ import org.slf4j.MDC;
* @author chen.ma
* @date 2022/3/2 20:46
*/
public class TaskTraceBuilderHandler implements TaskTraceBuilder {
private final String TRACE_KEY = "traceId";
@Override
public void before() {
MDC.put(TRACE_KEY, "https://github.com/acmenlt/dynamic-threadpool 行行好, 点个 Star.");
}
@Override
public String traceBuild() {
String traceStr;
try {
traceStr = MDC.get(TRACE_KEY);
} finally {
clear();
}
return traceStr;
}
public final class TaskTraceBuilderHandler implements TaskDecorator {
@Override
public void clear() {
MDC.remove(TRACE_KEY);
public Runnable decorate(Runnable runnable) {
String executeTimeoutTrace = MDC.get(EXECUTE_TIMEOUT_TRACE);
Runnable taskRun = () -> {
if (StringUtil.isNotBlank(executeTimeoutTrace)) {
MDC.put(EXECUTE_TIMEOUT_TRACE, executeTimeoutTrace);
}
runnable.run();
// 此处不用进行清理操作, 统一在线程任务执行后清理
};
return taskRun;
}
}

@ -3,6 +3,7 @@ package cn.hippo4j.example.core.inittest;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@ -11,6 +12,8 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/**
* Test run time metrics.
*
@ -40,7 +43,12 @@ public class RunStateHandlerTest {
}
private void runTask(ExecutorService executorService) {
// 模拟任务运行
new Thread(() -> {
/**
* 线, MDC Trace , .
*/
MDC.put(EXECUTE_TIMEOUT_TRACE, "https://github.com/acmenlt/dynamic-threadpool 感觉不错来个 Star.");
ThreadUtil.sleep(5000);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
try {

@ -1,7 +1,6 @@
package cn.hippo4j.core.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
@ -129,9 +128,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
TaskTraceBuilder taskTraceBuilder = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskTraceBuilder();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskTraceBuilder(taskTraceBuilder);
}
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);

@ -4,7 +4,6 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.TaskTraceBuilder;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
@ -14,9 +13,9 @@ import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil;
import cn.hutool.core.util.BooleanUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -165,9 +164,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getExecuteTimeOut();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setExecuteTimeOut(executeTimeOut);
TaskTraceBuilder taskTraceBuilder = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskTraceBuilder();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskTraceBuilder(taskTraceBuilder);
}
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);

Loading…
Cancel
Save