refactor(executor): 重构执行器实现以提高可维护性

- 将单例模式改为实例方法调用,通过getInstance()获取执行器实例
- 引入Helper类替换原有静态线程实现,包括ExecutorRegistryThreadHelper、
  JobLogFileCleanThreadHelper和TriggerCallbackThreadHelper
- 移除废弃的常量ELEGANT_SHUTDOWN_WAITING_SECONDS,改用Const类中的统一配置
- 更新所有内部方法调用为实例方法,增强代码的可测试性和灵活性
- 使用MessageQueue和CyclicThread优化回调消息处理机制
- 统一资源管理和生命周期控制,改进线程安全和资源清理逻辑
3.4.1-release
xuxueli 3 days ago
parent a7a3d44ee1
commit 9d4ba92537

@ -2840,15 +2840,19 @@ alter table xxl_job_log
```
### 7.45 版本 v3.4.1 Release Notes[ING]
- 1、【重构】项目结构AI Ready重构业务逻辑与框架逻辑分离提升项目可读性与可维护性
- 2、【调整】消息中心移除context-path前缀配置项简化客户端配置
(存量客户端升级需要注意:升级后需要将配置项 "xxl.job.admin.addresses" 中的 context-path 前缀移除)
- 3、【优化】任务参数长度调整最长支持2048字符
- 4、【升级】调度中心UI交互优化任务及日志管理支持下拉框模糊搜索提升交互体验
- 5、【修复】XxlJobFileAppender自定义地址callbackLogPath设置无效问题修复合并ISSUS-3963
- 6、【优化】调度组件守护线程代码重构提升稳定性以及可维护性
- 7、【TODO】调度中心OpenAPI完善提供任务管理能力封装Agent Skill并推送ClawHub
- 8、【TODO】AccessToken升级执行器维度隔离支持线上化配置升级双端OpenApi适配AccessToken升级
- 1、【重构】项目结构AI友好性重构业务逻辑与框架逻辑分离提升项目可读性与可维护性
- 2、【重构】调度中心与执行期组件重构线程管理与内存队列升级统一资源管理和生命周期控制改进线程安全和资源清理逻辑
- 3、【调整】消息中心移除自身context-path前缀配置项简化客户端配置
(存量客户端升级需要注意:升级后需要将配置项 "xxl.job.admin.addresses" 中的 context-path 后缀移除)
- 4、【优化】任务参数长度调整最长支持2048字符
- 5、【升级】调度中心UI交互优化任务及日志列表下拉框支持模糊搜索提升交互体验
- 6、【修复】XxlJobFileAppender自定义地址callbackLogPath设置无效问题修复合并ISSUS-3963
- 7、【优化】调度组件守护线程代码重构提升稳定性以及可维护性
### 7.46 版本 v3.4.2 Release Notes[ING]
- 1、【TODO】调度中心OpenAPI完善提供任务管理能力封装Agent Skill并推送ClawHub
- 2、【TODO】AccessToken升级执行器维度隔离支持线上化配置升级双端OpenApi适配AccessToken升级
### TODO LIST

@ -1,15 +1,17 @@
package com.xxl.job.core.executor;
import com.xxl.job.core.constant.Const;
import com.xxl.job.core.openapi.AdminBiz;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.handler.impl.MethodJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.openapi.AdminBiz;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.ExecutorRegistryThreadHelper;
import com.xxl.job.core.thread.JobLogFileCleanThreadHelper;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.job.core.thread.TriggerCallbackThreadHelper;
import com.xxl.tool.core.MapTool;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.http.HttpTool;
import com.xxl.tool.http.IPTool;
@ -30,12 +32,17 @@ import java.util.concurrent.TimeUnit;
public class XxlJobExecutor {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
/*
* elegant shutdown wait seconds
*/
private static final long ELEGANT_SHUTDOWN_WAITING_SECONDS = 5;
// ---------------------- instance ----------------------
private static XxlJobExecutor xxlJobExecutor = null;
public static XxlJobExecutor getInstance() {
return xxlJobExecutor;
}
// ---------------------- field ----------------------
private String adminAddresses;
private String accessToken;
private int timeout;
@ -78,10 +85,41 @@ public class XxlJobExecutor {
this.logRetentionDays = logRetentionDays;
}
public String getAccessToken() {
return accessToken;
}
public String getAppname() {
return appname;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
// ---------------------- start + stop ----------------------
private ExecutorRegistryThreadHelper executorRegistryThreadHelper;
private JobLogFileCleanThreadHelper jobLogFileCleanThreadHelper;
private TriggerCallbackThreadHelper triggerCallbackThreadHelper;
public ExecutorRegistryThreadHelper getExecutorRegistryThreadHelper() {
return executorRegistryThreadHelper;
}
public TriggerCallbackThreadHelper getTriggerCallbackThreadHelper() {
return triggerCallbackThreadHelper;
}
/**
* start
*/
public void start() throws Exception {
// init instance
xxlJobExecutor = this;
// valid enabled
if (enabled!=null && !enabled) {
logger.info(">>>>>>>>>>> xxl-job executor start fail, enabled:{}", enabled);
@ -92,29 +130,34 @@ public class XxlJobExecutor {
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken, timeout);
initAdminBizList();
// 1、init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
jobLogFileCleanThreadHelper = new JobLogFileCleanThreadHelper();
jobLogFileCleanThreadHelper.start(logRetentionDays);
// 2、init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
triggerCallbackThreadHelper = new TriggerCallbackThreadHelper();
triggerCallbackThreadHelper.start(this);
// 3、init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
// 3、EmbedServer + ExecutorRegistryThreadHelper
executorRegistryThreadHelper = new ExecutorRegistryThreadHelper();
startEmbedServer();
}
/**
* destroy
*/
public void destroy(){
// 1、destroy executor-server
stopEmbedServer();
// destroy jobThreadRepository
if (!jobThreadRepository.isEmpty()) {
if (MapTool.isNotEmpty(jobThreadRepository)) {
// 1.1、elegant shutdown wait job finish
try {
TimeUnit.SECONDS.sleep(ELEGANT_SHUTDOWN_WAITING_SECONDS);
TimeUnit.SECONDS.sleep(Const.ELEGANT_SHUTDOWN_WAITING_SECONDS);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
@ -135,19 +178,26 @@ public class XxlJobExecutor {
}
jobHandlerRepository.clear();
// 2、destroy TriggerCallbackThread
triggerCallbackThreadHelper.stop();
// 2、destroy JobLogFileCleanThread
JobLogFileCleanThread.getInstance().toStop();
// 3、destroy TriggerCallbackThread
TriggerCallbackThread.getInstance().toStop();
// 3、destroy JobLogFileCleanThread
jobLogFileCleanThreadHelper.stop();
}
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken, int timeout) throws Exception {
/**
* admin-client list
*/
private final List<AdminBiz> adminBizList = new ArrayList<>();
/**
* init adminBizList
*/
private void initAdminBizList() throws Exception {
// valid
if (StringTool.isBlank(adminAddresses)) {
return;
@ -174,48 +224,56 @@ public class XxlJobExecutor {
.proxy(AdminBiz.class);
// registry
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
public static List<AdminBiz> getAdminBizList(){
/**
* get adminBizList
*/
public List<AdminBiz> getAdminBizList(){
return adminBizList;
}
// ---------------------- executor-server (rpc provider) ----------------------
private EmbedServer embedServer = null;
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
/**
* start embed server
*/
private void startEmbedServer() throws Exception {
// fill ip port
port = port>0?port: IPTool.getAvailablePort(9999);
ip = StringTool.isNotBlank(ip) ? ip : IPTool.getIp();
this.port = this.port>0?this.port: IPTool.getAvailablePort(9999);
this.ip = StringTool.isNotBlank(this.ip) ? this.ip : IPTool.getIp();
// generate address
if (StringTool.isBlank(address)) {
if (StringTool.isBlank(this.address)) {
// registry-addressdefault use address to registry , otherwise use ip:port if address is null
String ip_port_address = IPTool.toAddressString(ip, port);
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
this.address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (StringTool.isBlank(accessToken)) {
if (StringTool.isBlank(this.accessToken)) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
embedServer.start(this);
}
/**
* stop embed server
*/
private void stopEmbedServer() {
// stop provider factory
if (embedServer != null) {
try {
embedServer.stop();
embedServer.stop(this);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
@ -224,14 +282,30 @@ public class XxlJobExecutor {
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
/**
* job handler repository
*/
private final ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<>();
/**
* load JobHandler instance by name
*/
public IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
public static IJobHandler registryJobHandler(String name, IJobHandler jobHandler){
/**
* registry JobHandler
*/
public IJobHandler registryJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
/**
* registry JobHandler for method
*/
protected void registryJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
if (xxlJob == null) {
return;
@ -241,7 +315,7 @@ public class XxlJobExecutor {
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
if (name.trim().isEmpty()) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
@ -264,7 +338,7 @@ public class XxlJobExecutor {
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
if (StringTool.isNotBlank(xxlJob.init())) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
@ -272,7 +346,7 @@ public class XxlJobExecutor {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
if (StringTool.isNotBlank(xxlJob.destroy())) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
@ -288,8 +362,9 @@ public class XxlJobExecutor {
// ---------------------- job thread repository ----------------------
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
private final ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<>();
public JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
@ -303,7 +378,7 @@ public class XxlJobExecutor {
return newJobThread;
}
public static JobThread removeJobThread(int jobId, String removeOldReason){
public JobThread removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
@ -314,7 +389,8 @@ public class XxlJobExecutor {
return null;
}
public static JobThread loadJobThread(int jobId){
public JobThread loadJobThread(int jobId){
return jobThreadRepository.get(jobId);
}
}

@ -1,9 +1,7 @@
package com.xxl.job.core.openapi.impl;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.openapi.ExecutorBiz;
import com.xxl.job.core.openapi.model.*;
import com.xxl.job.core.constant.ExecutorBlockStrategyEnum;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.glue.GlueTypeEnum;
@ -11,6 +9,8 @@ import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.handler.impl.ScriptJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.openapi.ExecutorBiz;
import com.xxl.job.core.openapi.model.*;
import com.xxl.job.core.thread.JobThread;
import com.xxl.tool.response.Response;
import org.slf4j.Logger;
@ -22,7 +22,7 @@ import java.util.Date;
* Created by xuxueli on 17/3/1.
*/
public class ExecutorBizImpl implements ExecutorBiz {
private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class);
private static final Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class);
@Override
public Response<String> beat() {
@ -34,7 +34,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
// isRunningOrHasQueue
boolean isRunningOrHasQueue = false;
JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatRequest.getJobId());
JobThread jobThread = XxlJobExecutor.getInstance().loadJobThread(idleBeatRequest.getJobId());
if (jobThread != null && jobThread.isRunningOrHasQueue()) {
isRunningOrHasQueue = true;
}
@ -48,7 +48,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
@Override
public Response<String> run(TriggerRequest triggerRequest) {
// load oldjobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerRequest.getJobId());
JobThread jobThread = XxlJobExecutor.getInstance().loadJobThread(triggerRequest.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
@ -57,7 +57,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerRequest.getExecutorHandler());
IJobHandler newJobHandler = XxlJobExecutor.getInstance().loadJobHandler(triggerRequest.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
@ -142,7 +142,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerRequest.getJobId(), jobHandler, removeOldReason);
jobThread = XxlJobExecutor.getInstance().registJobThread(triggerRequest.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
@ -152,9 +152,9 @@ public class ExecutorBizImpl implements ExecutorBiz {
@Override
public Response<String> kill(KillRequest killRequest) {
// kill handlerThread, and create new one
JobThread jobThread = XxlJobExecutor.loadJobThread(killRequest.getJobId());
JobThread jobThread = XxlJobExecutor.getInstance().loadJobThread(killRequest.getJobId());
if (jobThread != null) {
XxlJobExecutor.removeJobThread(killRequest.getJobId(), "scheduling center kill job.");
XxlJobExecutor.getInstance().removeJobThread(killRequest.getJobId(), "scheduling center kill job.");
return Response.ofSuccess();
}

@ -1,10 +1,13 @@
package com.xxl.job.core.server;
import com.xxl.job.core.constant.Const;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.openapi.ExecutorBiz;
import com.xxl.job.core.openapi.impl.ExecutorBizImpl;
import com.xxl.job.core.openapi.model.*;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.openapi.model.IdleBeatRequest;
import com.xxl.job.core.openapi.model.KillRequest;
import com.xxl.job.core.openapi.model.LogRequest;
import com.xxl.job.core.openapi.model.TriggerRequest;
import com.xxl.tool.error.ThrowableTool;
import com.xxl.tool.json.GsonTool;
import com.xxl.tool.response.Response;
@ -34,8 +37,16 @@ public class EmbedServer {
private ExecutorBiz executorBiz;
private Thread thread;
public void start(final String address, final int port, final String appname, final String accessToken) {
public void start(final XxlJobExecutor xxlJobExecutor) {
/**
* init executor biz service
*/
executorBiz = new ExecutorBizImpl();
/**
* start server
*/
thread = new Thread(new Runnable() {
@Override
public void run() {
@ -72,18 +83,18 @@ public class EmbedServer {
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
.addLast(new EmbedHttpServerHandler(executorBiz, xxlJobExecutor.getAccessToken(), bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
ChannelFuture future = bootstrap.bind(xxlJobExecutor.getPort()).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, xxlJobExecutor.getPort());
// start registry
startRegistry(appname, address);
xxlJobExecutor.getExecutorRegistryThreadHelper().start(xxlJobExecutor);
// wait util stop
future.channel().closeFuture().sync();
@ -108,14 +119,14 @@ public class EmbedServer {
thread.start();
}
public void stop() throws Exception {
public void stop(final XxlJobExecutor xxlJobExecutor) throws Exception {
// destroy server thread
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
// stop registry
stopRegistry();
xxlJobExecutor.getExecutorRegistryThreadHelper().stop(xxlJobExecutor);
logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");
}
@ -244,15 +255,4 @@ public class EmbedServer {
}
}
// ---------------------- registry ----------------------
public void startRegistry(final String appname, final String address) {
// start registry
ExecutorRegistryThread.getInstance().start(appname, address);
}
public void stopRegistry() {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}
}

@ -1,131 +0,0 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.constant.RegistType;
import com.xxl.job.core.openapi.AdminBiz;
import com.xxl.job.core.openapi.model.RegistryRequest;
import com.xxl.job.core.constant.Const;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.tool.response.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* Created by xuxueli on 17/3/2.
*/
public class ExecutorRegistryThread {
private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);
private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){
return instance;
}
private Thread registryThread;
private volatile boolean toStop = false;
public void start(final String appname, final String address){
// valid
if (appname==null || appname.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
return;
}
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
toStop = false;
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
Response<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && registryResult.isSuccess()) {
registryResult = Response.ofSuccess();
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Throwable e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(Const.BEAT_TIMEOUT);
}
} catch (Throwable e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
try {
RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
Response<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && registryResult.isSuccess()) {
registryResult = Response.ofSuccess();
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Throwable e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
public void toStop() {
toStop = true;
// interrupt and wait
if (registryThread != null) {
registryThread.interrupt();
try {
registryThread.join();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
}
}

@ -0,0 +1,107 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.constant.RegistType;
import com.xxl.job.core.openapi.AdminBiz;
import com.xxl.job.core.openapi.model.RegistryRequest;
import com.xxl.job.core.constant.Const;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.tool.concurrent.CyclicThread;
import com.xxl.tool.core.CollectionTool;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.response.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by xuxueli on 17/3/2.
*/
public class ExecutorRegistryThreadHelper {
private static final Logger logger = LoggerFactory.getLogger(ExecutorRegistryThreadHelper.class);
/**
* registry thread
*/
private CyclicThread registryThread;
/**
* start
*/
public void start(final XxlJobExecutor xxlJobExecutor){
/**
* valid
*/
if (StringTool.isBlank(xxlJobExecutor.getAppname())) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
return;
}
if (CollectionTool.isEmpty(xxlJobExecutor.getAdminBizList())) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
/**
* registry thread
*/
registryThread = new CyclicThread("ExecutorRegistryThread#registryThread", true, new Runnable() {
@Override
public void run() {
RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), xxlJobExecutor.getAppname(), xxlJobExecutor.getAddress());
for (AdminBiz adminBiz: xxlJobExecutor.getAdminBizList()) {
try {
Response<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && registryResult.isSuccess()) {
registryResult = Response.ofSuccess();
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Throwable e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
}
}, Const.BEAT_TIMEOUT * 1000L, true);
registryThread.start();
}
/**
* stop
*/
public void stop(final XxlJobExecutor xxlJobExecutor) {
/**
* 1stop registryThread
*/
registryThread.stop();
/**
* 2registry remove
*/
registryRemove(xxlJobExecutor);
}
private void registryRemove(final XxlJobExecutor xxlJobExecutor){
RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), xxlJobExecutor.getAppname(), xxlJobExecutor.getAddress());
for (AdminBiz adminBiz: xxlJobExecutor.getAdminBizList()) {
try {
Response<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && registryResult.isSuccess()) {
registryResult = Response.ofSuccess();
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Throwable e) {
logger.warn(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}, error:{}", registryParam, e.getMessage());
}
}
}
}

@ -1,128 +0,0 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.tool.core.DateTool;
import com.xxl.tool.io.FileTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* job file clean thread
*
* @author xuxueli 2017-12-29 16:23:43
*/
public class JobLogFileCleanThread {
private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);
private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
public static JobLogFileCleanThread getInstance(){
return instance;
}
private Thread localThread;
private volatile boolean toStop = false;
public void start(final long logRetentionDays){
// limit min value
if (logRetentionDays < 3 ) {
return; // effective only when logRetentionDays >= 3
}
toStop = false;
localThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// clean log dir, over logRetentionDays
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs!=null && childDirs.length>0) {
// today
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY,0);
todayCal.set(Calendar.MINUTE,0);
todayCal.set(Calendar.SECOND,0);
todayCal.set(Calendar.MILLISECOND,0);
Date todayDate = todayCal.getTime();
// clean expired logfile
for (File childFile: childDirs) {
// valid log-path: must be directory
if (!childFile.isDirectory()) {
continue;
}
// valid day log-path: like "---/2017-12-25/639.log"
if (!childFile.getName().contains("-")) {
continue;
}
// parse create-day of file-path
Date logFileCreateDate = null;
try {
logFileCreateDate = DateTool.parseDate(childFile.getName());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
// check expired
Date expiredDate = DateTool.addDays(logFileCreateDate, logRetentionDays);
if (todayDate.getTime() > expiredDate.getTime()) {
// expired, remove all log of this day
FileTool.delete(childFile);
//FileUtil.deleteRecursively(childFile);
}
}
}
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.DAYS.sleep(1);
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");
}
});
localThread.setDaemon(true);
localThread.setName("xxl-job, executor JobLogFileCleanThread");
localThread.start();
}
public void toStop() {
toStop = true;
if (localThread == null) {
return;
}
// interrupt and wait
localThread.interrupt();
try {
localThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}

@ -0,0 +1,108 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.tool.concurrent.CyclicThread;
import com.xxl.tool.core.DateTool;
import com.xxl.tool.io.FileTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Calendar;
import java.util.Date;
/**
* job file clean thread
*
* @author xuxueli 2017-12-29 16:23:43
*/
public class JobLogFileCleanThreadHelper {
private static final Logger logger = LoggerFactory.getLogger(JobLogFileCleanThreadHelper.class);
/**
* monitor thread
*/
private CyclicThread logFileCleanThread;
/**
* start
*/
public void start(final long logRetentionDays){
/**
* limit min value
*/
if (logRetentionDays < 3 ) {
return; // effective only when logRetentionDays >= 3
}
/**
* logFileCleanThread
*/
logFileCleanThread = new CyclicThread("JobLogFileCleanThreadHelper#logFileCleanThread", true, new Runnable() {
@Override
public void run() {
// clean log dir, over logRetentionDays
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs!=null && childDirs.length>0) {
// today
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY,0);
todayCal.set(Calendar.MINUTE,0);
todayCal.set(Calendar.SECOND,0);
todayCal.set(Calendar.MILLISECOND,0);
Date todayDate = todayCal.getTime();
// clean expired logfile
for (File childFile: childDirs) {
// valid log-path: must be directory
if (!childFile.isDirectory()) {
continue;
}
// valid day log-path: like "---/2017-12-25/639.log"
if (!childFile.getName().contains("-")) {
continue;
}
// parse create-day of file-path
Date logFileCreateDate = null;
try {
logFileCreateDate = DateTool.parseDate(childFile.getName());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
// check expired
Date expiredDate = DateTool.addDays(logFileCreateDate, logRetentionDays);
if (todayDate.getTime() > expiredDate.getTime()) {
// expired, remove all log of this day
FileTool.delete(childFile);
//FileUtil.deleteRecursively(childFile);
}
}
}
}
}, DateTool.MILLIS_PER_DAY, true);
logFileCleanThread.start();
}
/**
* stop
*/
public void stop() {
// stop logFileCleanThread
logFileCleanThread.stop();
}
}

@ -181,7 +181,7 @@ public class JobThread extends Thread{
} else {
if (idleTimes > 30) {
if(triggerQueue.isEmpty()) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idle times over limit.");
XxlJobExecutor.getInstance().removeJobThread(jobId, "excutor idle times over limit.");
}
}
}
@ -203,7 +203,7 @@ public class JobThread extends Thread{
// callback handler info
if (!toStop) {
// common
TriggerCallbackThread.pushCallBack(new CallbackRequest(
XxlJobExecutor.getInstance().getTriggerCallbackThreadHelper().pushCallBack(new CallbackRequest(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
@ -211,7 +211,7 @@ public class JobThread extends Thread{
);
} else {
// is killed
TriggerCallbackThread.pushCallBack(new CallbackRequest(
XxlJobExecutor.getInstance().getTriggerCallbackThreadHelper().pushCallBack(new CallbackRequest(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
@ -227,7 +227,7 @@ public class JobThread extends Thread{
TriggerRequest triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new CallbackRequest(
XxlJobExecutor.getInstance().getTriggerCallbackThreadHelper().pushCallBack(new CallbackRequest(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,

@ -1,305 +0,0 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.openapi.AdminBiz;
import com.xxl.job.core.openapi.model.CallbackRequest;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.constant.Const;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.tool.core.ArrayTool;
import com.xxl.tool.core.CollectionTool;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.crypto.Md5Tool;
import com.xxl.tool.json.GsonTool;
import com.xxl.tool.io.FileTool;
import com.xxl.tool.response.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Trigger Callback Thread
*
* Created by xuxueli on 16/7/22.
*/
public class TriggerCallbackThread {
private static final Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static final TriggerCallbackThread instance = new TriggerCallbackThread();
public static TriggerCallbackThread getInstance(){
return instance;
}
/**
* job results callback queue
*/
private final LinkedBlockingQueue<CallbackRequest> callBackQueue = new LinkedBlockingQueue<>();
public static void pushCallBack(CallbackRequest callback){
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}
/**
* callback thread
*/
private Thread triggerCallbackThread;
private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false;
public void start() {
// valid
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
toStop = false;
/**
* trigger callback thread
*/
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// normal callback
while(!toStop){
try {
CallbackRequest callback = getInstance().callBackQueue.take();
if (callback != null) {
// collect callback data
List<CallbackRequest> callbackParamList = new ArrayList<>();
callbackParamList.add(callback); // add one element
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); // drainTo other all elements
// do callback, will retry if error
if (CollectionTool.isNotEmpty(callbackParamList)) {
doCallback(callbackParamList);
}
}
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// thead stop, callback lasttime
try {
// collect callback data
List<CallbackRequest> callbackParamList = new ArrayList<>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
// do callback
if (CollectionTool.isNotEmpty(callbackParamList)) {
doCallback(callbackParamList);
}
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
}
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();
/**
* callback fail retry thread
*/
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
retryFailCallbackFile();
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(Const.BEAT_TIMEOUT);
} catch (Throwable e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
}
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.setName("xxl-job, executor TriggerRetryCallbackThread");
triggerRetryCallbackThread.start();
}
public void toStop(){
toStop = true;
// stop callback, interrupt and wait
if (triggerCallbackThread != null) { // support empty admin address
triggerCallbackThread.interrupt();
try {
triggerCallbackThread.join();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
// stop retry, interrupt and wait
if (triggerRetryCallbackThread != null) {
triggerRetryCallbackThread.interrupt();
try {
triggerRetryCallbackThread.join();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* do callback, will retry if error
*
* @param callbackParamList callback param list
*/
private void doCallback(List<CallbackRequest> callbackParamList){
boolean callbackRet = false;
// callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
Response<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && callbackResult.isSuccess()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Throwable e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
/**
* callback log
*/
private void callbackLog(List<CallbackRequest> callbackParamList, String logContent){
for (CallbackRequest callbackParam: callbackParamList) {
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());
XxlJobContext.setXxlJobContext(new XxlJobContext(
-1,
null,
-1,
-1,
logFileName,
-1,
-1));
XxlJobHelper.log(logContent);
}
}
// ---------------------- fail-callback file ----------------------
/**
* fail-callback file name
*/
private static final String failCallbackFileName = XxlJobFileAppender
.getCallbackLogPath()
.concat(File.separator)
.concat("xxl-job-callback-{x}")
.concat(".log");
/**
* append fail-callback file
*
* @param callbackParamList callback param list
*/
private void appendFailCallbackFile(List<CallbackRequest> callbackParamList) {
// valid
if (CollectionTool.isEmpty(callbackParamList)) {
return;
}
// generate callback data
String callbackData = GsonTool.toJson(callbackParamList);
String callbackDataMd5 = Md5Tool.md5(callbackData);
// create file
String finalLogFileName = failCallbackFileName.replace("{x}", callbackDataMd5);
// write callback log
try {
FileTool.writeString(finalLogFileName, callbackData);
} catch (IOException e) {
logger.error(">>>>>>>>>>> TriggerCallbackThread appendFailCallbackFile error, finalLogFileName:{}", finalLogFileName, e);
}
}
/**
* retry fail-callback file
*/
private void retryFailCallbackFile() {
// valid
File callbackLogPath = new File(XxlJobFileAppender.getCallbackLogPath());
if (!callbackLogPath.exists()) {
return;
}
// valid file type: must be directory
if (!FileTool.isDirectory(callbackLogPath)) {
FileTool.delete(callbackLogPath);
return;
}
// valid file in path: pass if empty
if (ArrayTool.isEmpty(callbackLogPath.listFiles())) {
return;
}
// load and clear file, do retry
for (File callbackLogFile: callbackLogPath.listFiles()) {
try {
// load data
String callbackData = FileTool.readString(callbackLogFile.getPath());
if (StringTool.isBlank(callbackData)) {
FileTool.delete(callbackLogFile);
continue;
}
// parse callback param
List<CallbackRequest> callbackParamList = GsonTool.fromJsonList(callbackData, CallbackRequest.class);
FileTool.delete(callbackLogFile);
// retry callback
doCallback(callbackParamList);
} catch (IOException e) {
logger.error(">>>>>>>>>>> TriggerCallbackThread retryFailCallbackFile error, callbackLogFile:{}", callbackLogFile.getPath(), e);
}
}
}
}

@ -0,0 +1,237 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.constant.Const;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.openapi.AdminBiz;
import com.xxl.job.core.openapi.model.CallbackRequest;
import com.xxl.tool.concurrent.CyclicThread;
import com.xxl.tool.concurrent.MessageQueue;
import com.xxl.tool.core.ArrayTool;
import com.xxl.tool.core.CollectionTool;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.crypto.Md5Tool;
import com.xxl.tool.io.FileTool;
import com.xxl.tool.json.GsonTool;
import com.xxl.tool.response.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/**
* Trigger Callback Thread
*
* Created by xuxueli on 16/7/22.
*/
public class TriggerCallbackThreadHelper {
private static final Logger logger = LoggerFactory.getLogger(TriggerCallbackThreadHelper.class);
/**
* callback message-queue
*/
private volatile MessageQueue<CallbackRequest> callbackMessageQueue;
/**
* retry callback-file thread
*/
private CyclicThread retryCallbackThread;
/**
* start
*/
public void start(final XxlJobExecutor xxlJobExecutor) {
// valid
if (xxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
/**
* 1callback message-queue
*/
callbackMessageQueue = new MessageQueue<CallbackRequest>(
"TriggerCallbackThreadHelper#callbackMessageQueue",
messages -> {
// do callback
doCallback(messages, xxlJobExecutor);
},
1,
50);
/**
* 2retry callback-file thread
*/
retryCallbackThread = new CyclicThread("TriggerCallbackThreadHelper#retryCallbackThread", true, new Runnable() {
@Override
public void run() {
// valid empty path
File callbackLogPath = new File(XxlJobFileAppender.getCallbackLogPath());
if (!callbackLogPath.exists()) {
return;
}
// valid file type: must be directory
if (!FileTool.isDirectory(callbackLogPath)) {
FileTool.delete(callbackLogPath);
return;
}
// valid none file
if (ArrayTool.isEmpty(callbackLogPath.listFiles())) {
return;
}
// load and clear file, do retry
for (File callbackLogFile: callbackLogPath.listFiles()) {
try {
// load data
String callbackData = FileTool.readString(callbackLogFile.getPath());
if (StringTool.isBlank(callbackData)) {
FileTool.delete(callbackLogFile);
continue;
}
// parse callback param
List<CallbackRequest> callbackParamList = GsonTool.fromJsonList(callbackData, CallbackRequest.class);
FileTool.delete(callbackLogFile);
// retry callback
doCallback(callbackParamList, xxlJobExecutor);
} catch (IOException e) {
logger.error(">>>>>>>>>>> TriggerCallbackThread retryFailCallbackFile error, callbackLogFile:{}", callbackLogFile.getPath(), e);
}
}
}
}, Const.BEAT_TIMEOUT * 1000L, true);
retryCallbackThread.start();
}
/**
* stop
*/
public void stop(){
// 1、stop callbackMessageQueue
if (callbackMessageQueue != null) {
callbackMessageQueue.stop(); // attempt wait for callback finish
}
// 2、stop retryCallbackThread
retryCallbackThread.stop();
}
/**
* submit callback message
*/
public void pushCallBack(CallbackRequest callback){
if (!callbackMessageQueue.produce(callback)) {
doCallback(new ArrayList<>(Collections.singletonList(callback)), XxlJobExecutor.getInstance());
}
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}
// ---------------------- do callback ----------------------
/**
* do callback, will retry if error
*
* @param callbackParamList callback param list
*/
private void doCallback(List<CallbackRequest> callbackParamList, final XxlJobExecutor xxlJobExecutor){
boolean callbackRet = false;
// callback request, will retry + append-log if fail
for (AdminBiz adminBiz: xxlJobExecutor.getAdminBizList()) {
try {
Response<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && callbackResult.isSuccess()) {
appendCallbackResult(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
appendCallbackResult(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Throwable e) {
appendCallbackResult(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
// write callback-file, will retry later
if (!callbackRet) {
writeCallbackLog(callbackParamList);
}
}
/**
* append callback result, to each joblog
*/
private void appendCallbackResult(List<CallbackRequest> callbackParamList, String logContent){
for (CallbackRequest callbackParam: callbackParamList) {
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());
XxlJobContext.setXxlJobContext(new XxlJobContext(
-1,
null,
-1,
-1,
logFileName,
-1,
-1));
XxlJobHelper.log(logContent);
}
}
// ---------------------- fail-callback file ----------------------
/**
* fail-callback file name
*/
private static final String failCallbackFileName = XxlJobFileAppender
.getCallbackLogPath()
.concat(File.separator)
.concat("xxl-job-callback-{x}")
.concat(".log");
/**
* write fail-callback file, will retry later
*
* @param callbackParamList callback param list
*/
private void writeCallbackLog(List<CallbackRequest> callbackParamList) {
// valid
if (CollectionTool.isEmpty(callbackParamList)) {
return;
}
// generate callback data
String callbackData = GsonTool.toJson(callbackParamList);
String callbackDataMd5 = Md5Tool.md5(callbackData);
// create file
String finalLogFileName = failCallbackFileName.replace("{x}", callbackDataMd5);
// write callback log
try {
FileTool.writeString(finalLogFileName, callbackData);
} catch (IOException e) {
logger.error(">>>>>>>>>>> TriggerCallbackThread appendFailCallbackFile error, finalLogFileName:{}", finalLogFileName, e);
}
}
}
Loading…
Cancel
Save