diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 6648a7f8..2b317d3e 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -2840,15 +2840,19 @@ alter table xxl_job_log ``` ### 7.45 版本 v3.4.1 Release Notes[ING] -- 1、【重构】项目结构AI Ready重构,业务逻辑与框架逻辑分离,提升项目可读性与可维护性; -- 2、【调整】消息中心移除context-path前缀配置项,简化客户端配置; - (存量客户端升级需要注意:升级后需要将配置项 "xxl.job.admin.addresses" 中的 context-path 前缀移除) -- 3、【优化】任务参数长度调整,最长支持2048字符; -- 4、【升级】调度中心UI交互优化,任务及日志管理支持下拉框模糊搜索,提升交互体验; -- 5、【修复】XxlJobFileAppender自定义地址callbackLogPath设置无效问题修复;合并ISSUS-3963; -- 6、【优化】调度组件守护线程代码重构,提升稳定性以及可维护性; -- 7、【TODO】调度中心OpenAPI完善,提供任务管理能力;封装Agent Skill并推送ClawHub; -- 8、【TODO】AccessToken升级:执行器维度隔离,支持线上化配置;升级双端OpenApi,适配AccessToken升级; +- 1、【重构】项目结构AI友好性重构,业务逻辑与框架逻辑分离,提升项目可读性与可维护性; +- 2、【重构】调度中心与执行期组件重构,线程管理与内存队列升级,统一资源管理和生命周期控制,改进线程安全和资源清理逻辑; +- 3、【调整】消息中心移除自身context-path前缀配置项,简化客户端配置; + (存量客户端升级需要注意:升级后需要将配置项 "xxl.job.admin.addresses" 中的 context-path 后缀移除) +- 4、【优化】任务参数长度调整,最长支持2048字符; +- 5、【升级】调度中心UI交互优化,任务及日志列表下拉框支持模糊搜索,提升交互体验; +- 6、【修复】XxlJobFileAppender自定义地址callbackLogPath设置无效问题修复;合并ISSUS-3963; +- 7、【优化】调度组件守护线程代码重构,提升稳定性以及可维护性; + + +### 7.46 版本 v3.4.2 Release Notes[ING] +- 1、【TODO】调度中心OpenAPI完善,提供任务管理能力;封装Agent Skill并推送ClawHub; +- 2、【TODO】AccessToken升级:执行器维度隔离,支持线上化配置;升级双端OpenApi,适配AccessToken升级; ### TODO LIST diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 9d27cdd0..71ea87f8 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -1,15 +1,17 @@ package com.xxl.job.core.executor; import com.xxl.job.core.constant.Const; -import com.xxl.job.core.openapi.AdminBiz; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.impl.MethodJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.openapi.AdminBiz; import com.xxl.job.core.server.EmbedServer; -import com.xxl.job.core.thread.JobLogFileCleanThread; +import com.xxl.job.core.thread.ExecutorRegistryThreadHelper; +import com.xxl.job.core.thread.JobLogFileCleanThreadHelper; import com.xxl.job.core.thread.JobThread; -import com.xxl.job.core.thread.TriggerCallbackThread; +import com.xxl.job.core.thread.TriggerCallbackThreadHelper; +import com.xxl.tool.core.MapTool; import com.xxl.tool.core.StringTool; import com.xxl.tool.http.HttpTool; import com.xxl.tool.http.IPTool; @@ -30,12 +32,17 @@ import java.util.concurrent.TimeUnit; public class XxlJobExecutor { private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); - /* - * elegant shutdown wait seconds - */ - private static final long ELEGANT_SHUTDOWN_WAITING_SECONDS = 5; + + // ---------------------- instance ---------------------- + + private static XxlJobExecutor xxlJobExecutor = null; + public static XxlJobExecutor getInstance() { + return xxlJobExecutor; + } + // ---------------------- field ---------------------- + private String adminAddresses; private String accessToken; private int timeout; @@ -78,10 +85,41 @@ public class XxlJobExecutor { this.logRetentionDays = logRetentionDays; } + public String getAccessToken() { + return accessToken; + } + public String getAppname() { + return appname; + } + public String getAddress() { + return address; + } + public int getPort() { + return port; + } + // ---------------------- start + stop ---------------------- + + private ExecutorRegistryThreadHelper executorRegistryThreadHelper; + private JobLogFileCleanThreadHelper jobLogFileCleanThreadHelper; + private TriggerCallbackThreadHelper triggerCallbackThreadHelper; + + public ExecutorRegistryThreadHelper getExecutorRegistryThreadHelper() { + return executorRegistryThreadHelper; + } + public TriggerCallbackThreadHelper getTriggerCallbackThreadHelper() { + return triggerCallbackThreadHelper; + } + + /** + * start + */ public void start() throws Exception { + // init instance + xxlJobExecutor = this; + // valid enabled if (enabled!=null && !enabled) { logger.info(">>>>>>>>>>> xxl-job executor start fail, enabled:{}", enabled); @@ -92,29 +130,34 @@ public class XxlJobExecutor { XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client - initAdminBizList(adminAddresses, accessToken, timeout); - + initAdminBizList(); // 1、init JobLogFileCleanThread - JobLogFileCleanThread.getInstance().start(logRetentionDays); + jobLogFileCleanThreadHelper = new JobLogFileCleanThreadHelper(); + jobLogFileCleanThreadHelper.start(logRetentionDays); // 2、init TriggerCallbackThread - TriggerCallbackThread.getInstance().start(); + triggerCallbackThreadHelper = new TriggerCallbackThreadHelper(); + triggerCallbackThreadHelper.start(this); - // 3、init executor-server - initEmbedServer(address, ip, port, appname, accessToken); + // 3、EmbedServer + ExecutorRegistryThreadHelper + executorRegistryThreadHelper = new ExecutorRegistryThreadHelper(); + startEmbedServer(); } + /** + * destroy + */ public void destroy(){ // 1、destroy executor-server stopEmbedServer(); // destroy jobThreadRepository - if (!jobThreadRepository.isEmpty()) { + if (MapTool.isNotEmpty(jobThreadRepository)) { // 1.1、elegant shutdown wait job finish try { - TimeUnit.SECONDS.sleep(ELEGANT_SHUTDOWN_WAITING_SECONDS); + TimeUnit.SECONDS.sleep(Const.ELEGANT_SHUTDOWN_WAITING_SECONDS); } catch (Throwable e) { logger.error(e.getMessage(), e); } @@ -135,19 +178,26 @@ public class XxlJobExecutor { } jobHandlerRepository.clear(); + // 2、destroy TriggerCallbackThread + triggerCallbackThreadHelper.stop(); - // 2、destroy JobLogFileCleanThread - JobLogFileCleanThread.getInstance().toStop(); - - // 3、destroy TriggerCallbackThread - TriggerCallbackThread.getInstance().toStop(); - + // 3、destroy JobLogFileCleanThread + jobLogFileCleanThreadHelper.stop(); } // ---------------------- admin-client (rpc invoker) ---------------------- - private static List adminBizList; - private void initAdminBizList(String adminAddresses, String accessToken, int timeout) throws Exception { + + /** + * admin-client list + */ + private final List adminBizList = new ArrayList<>(); + + /** + * init adminBizList + */ + private void initAdminBizList() throws Exception { + // valid if (StringTool.isBlank(adminAddresses)) { return; @@ -174,48 +224,56 @@ public class XxlJobExecutor { .proxy(AdminBiz.class); // registry - if (adminBizList == null) { - adminBizList = new ArrayList(); - } adminBizList.add(adminBiz); } } - public static List getAdminBizList(){ + /** + * get adminBizList + */ + public List getAdminBizList(){ return adminBizList; } + // ---------------------- executor-server (rpc provider) ---------------------- + private EmbedServer embedServer = null; - private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { + /** + * start embed server + */ + private void startEmbedServer() throws Exception { // fill ip port - port = port>0?port: IPTool.getAvailablePort(9999); - ip = StringTool.isNotBlank(ip) ? ip : IPTool.getIp(); + this.port = this.port>0?this.port: IPTool.getAvailablePort(9999); + this.ip = StringTool.isNotBlank(this.ip) ? this.ip : IPTool.getIp(); // generate address - if (StringTool.isBlank(address)) { + if (StringTool.isBlank(this.address)) { // registry-address:default use address to registry , otherwise use ip:port if address is null String ip_port_address = IPTool.toAddressString(ip, port); - address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); + this.address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // accessToken - if (StringTool.isBlank(accessToken)) { + if (StringTool.isBlank(this.accessToken)) { logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."); } // start embedServer = new EmbedServer(); - embedServer.start(address, port, appname, accessToken); + embedServer.start(this); } + /** + * stop embed server + */ private void stopEmbedServer() { // stop provider factory if (embedServer != null) { try { - embedServer.stop(); + embedServer.stop(this); } catch (Exception e) { logger.error(e.getMessage(), e); } @@ -224,14 +282,30 @@ public class XxlJobExecutor { // ---------------------- job handler repository ---------------------- - private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap(); - public static IJobHandler loadJobHandler(String name){ + + /** + * job handler repository + */ + private final ConcurrentMap jobHandlerRepository = new ConcurrentHashMap<>(); + + /** + * load JobHandler instance by name + */ + public IJobHandler loadJobHandler(String name){ return jobHandlerRepository.get(name); } - public static IJobHandler registryJobHandler(String name, IJobHandler jobHandler){ + + /** + * registry JobHandler + */ + public IJobHandler registryJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); } + + /** + * registry JobHandler for method + */ protected void registryJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){ if (xxlJob == null) { return; @@ -241,7 +315,7 @@ public class XxlJobExecutor { //make and simplify the variables since they'll be called several times later Class clazz = bean.getClass(); String methodName = executeMethod.getName(); - if (name.trim().length() == 0) { + if (name.trim().isEmpty()) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] ."); } if (loadJobHandler(name) != null) { @@ -264,7 +338,7 @@ public class XxlJobExecutor { Method initMethod = null; Method destroyMethod = null; - if (xxlJob.init().trim().length() > 0) { + if (StringTool.isNotBlank(xxlJob.init())) { try { initMethod = clazz.getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); @@ -272,7 +346,7 @@ public class XxlJobExecutor { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] ."); } } - if (xxlJob.destroy().trim().length() > 0) { + if (StringTool.isNotBlank(xxlJob.destroy())) { try { destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); @@ -288,8 +362,9 @@ public class XxlJobExecutor { // ---------------------- job thread repository ---------------------- - private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); - public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ + + private final ConcurrentMap jobThreadRepository = new ConcurrentHashMap<>(); + public JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); @@ -303,7 +378,7 @@ public class XxlJobExecutor { return newJobThread; } - public static JobThread removeJobThread(int jobId, String removeOldReason){ + public JobThread removeJobThread(int jobId, String removeOldReason){ JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); @@ -314,7 +389,8 @@ public class XxlJobExecutor { return null; } - public static JobThread loadJobThread(int jobId){ + public JobThread loadJobThread(int jobId){ return jobThreadRepository.get(jobId); } + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/openapi/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/openapi/impl/ExecutorBizImpl.java index c5afe7db..4bf461aa 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/openapi/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/openapi/impl/ExecutorBizImpl.java @@ -1,9 +1,7 @@ package com.xxl.job.core.openapi.impl; -import com.xxl.job.core.context.XxlJobContext; -import com.xxl.job.core.openapi.ExecutorBiz; -import com.xxl.job.core.openapi.model.*; import com.xxl.job.core.constant.ExecutorBlockStrategyEnum; +import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.glue.GlueFactory; import com.xxl.job.core.glue.GlueTypeEnum; @@ -11,6 +9,8 @@ import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.impl.GlueJobHandler; import com.xxl.job.core.handler.impl.ScriptJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.openapi.ExecutorBiz; +import com.xxl.job.core.openapi.model.*; import com.xxl.job.core.thread.JobThread; import com.xxl.tool.response.Response; import org.slf4j.Logger; @@ -22,7 +22,7 @@ import java.util.Date; * Created by xuxueli on 17/3/1. */ public class ExecutorBizImpl implements ExecutorBiz { - private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class); + private static final Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class); @Override public Response beat() { @@ -34,7 +34,7 @@ public class ExecutorBizImpl implements ExecutorBiz { // isRunningOrHasQueue boolean isRunningOrHasQueue = false; - JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatRequest.getJobId()); + JobThread jobThread = XxlJobExecutor.getInstance().loadJobThread(idleBeatRequest.getJobId()); if (jobThread != null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; } @@ -48,7 +48,7 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public Response run(TriggerRequest triggerRequest) { // load old:jobHandler + jobThread - JobThread jobThread = XxlJobExecutor.loadJobThread(triggerRequest.getJobId()); + JobThread jobThread = XxlJobExecutor.getInstance().loadJobThread(triggerRequest.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; @@ -57,7 +57,7 @@ public class ExecutorBizImpl implements ExecutorBiz { if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler - IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerRequest.getExecutorHandler()); + IJobHandler newJobHandler = XxlJobExecutor.getInstance().loadJobHandler(triggerRequest.getExecutorHandler()); // valid old jobThread if (jobThread!=null && jobHandler != newJobHandler) { @@ -142,7 +142,7 @@ public class ExecutorBizImpl implements ExecutorBiz { // replace thread (new or exists invalid) if (jobThread == null) { - jobThread = XxlJobExecutor.registJobThread(triggerRequest.getJobId(), jobHandler, removeOldReason); + jobThread = XxlJobExecutor.getInstance().registJobThread(triggerRequest.getJobId(), jobHandler, removeOldReason); } // push data to queue @@ -152,9 +152,9 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public Response kill(KillRequest killRequest) { // kill handlerThread, and create new one - JobThread jobThread = XxlJobExecutor.loadJobThread(killRequest.getJobId()); + JobThread jobThread = XxlJobExecutor.getInstance().loadJobThread(killRequest.getJobId()); if (jobThread != null) { - XxlJobExecutor.removeJobThread(killRequest.getJobId(), "scheduling center kill job."); + XxlJobExecutor.getInstance().removeJobThread(killRequest.getJobId(), "scheduling center kill job."); return Response.ofSuccess(); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index c37cd513..2b3829e1 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -1,10 +1,13 @@ package com.xxl.job.core.server; import com.xxl.job.core.constant.Const; +import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.openapi.ExecutorBiz; import com.xxl.job.core.openapi.impl.ExecutorBizImpl; -import com.xxl.job.core.openapi.model.*; -import com.xxl.job.core.thread.ExecutorRegistryThread; +import com.xxl.job.core.openapi.model.IdleBeatRequest; +import com.xxl.job.core.openapi.model.KillRequest; +import com.xxl.job.core.openapi.model.LogRequest; +import com.xxl.job.core.openapi.model.TriggerRequest; import com.xxl.tool.error.ThrowableTool; import com.xxl.tool.json.GsonTool; import com.xxl.tool.response.Response; @@ -34,8 +37,16 @@ public class EmbedServer { private ExecutorBiz executorBiz; private Thread thread; - public void start(final String address, final int port, final String appname, final String accessToken) { + public void start(final XxlJobExecutor xxlJobExecutor) { + + /** + * init executor biz service + */ executorBiz = new ExecutorBizImpl(); + + /** + * start server + */ thread = new Thread(new Runnable() { @Override public void run() { @@ -72,18 +83,18 @@ public class EmbedServer { .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL - .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); + .addLast(new EmbedHttpServerHandler(executorBiz, xxlJobExecutor.getAccessToken(), bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind - ChannelFuture future = bootstrap.bind(port).sync(); + ChannelFuture future = bootstrap.bind(xxlJobExecutor.getPort()).sync(); - logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); + logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, xxlJobExecutor.getPort()); // start registry - startRegistry(appname, address); + xxlJobExecutor.getExecutorRegistryThreadHelper().start(xxlJobExecutor); // wait util stop future.channel().closeFuture().sync(); @@ -108,14 +119,14 @@ public class EmbedServer { thread.start(); } - public void stop() throws Exception { + public void stop(final XxlJobExecutor xxlJobExecutor) throws Exception { // destroy server thread if (thread != null && thread.isAlive()) { thread.interrupt(); } // stop registry - stopRegistry(); + xxlJobExecutor.getExecutorRegistryThreadHelper().stop(xxlJobExecutor); logger.info(">>>>>>>>>>> xxl-job remoting server destroy success."); } @@ -244,15 +255,4 @@ public class EmbedServer { } } - // ---------------------- registry ---------------------- - - public void startRegistry(final String appname, final String address) { - // start registry - ExecutorRegistryThread.getInstance().start(appname, address); - } - - public void stopRegistry() { - // stop registry - ExecutorRegistryThread.getInstance().toStop(); - } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java deleted file mode 100644 index 8fe3c964..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.xxl.job.core.thread; - -import com.xxl.job.core.constant.RegistType; -import com.xxl.job.core.openapi.AdminBiz; -import com.xxl.job.core.openapi.model.RegistryRequest; -import com.xxl.job.core.constant.Const; -import com.xxl.job.core.executor.XxlJobExecutor; -import com.xxl.tool.response.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * Created by xuxueli on 17/3/2. - */ -public class ExecutorRegistryThread { - private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class); - - private static ExecutorRegistryThread instance = new ExecutorRegistryThread(); - public static ExecutorRegistryThread getInstance(){ - return instance; - } - - private Thread registryThread; - private volatile boolean toStop = false; - public void start(final String appname, final String address){ - - // valid - if (appname==null || appname.trim().length()==0) { - logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null."); - return; - } - if (XxlJobExecutor.getAdminBizList() == null) { - logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); - return; - } - - toStop = false; - registryThread = new Thread(new Runnable() { - @Override - public void run() { - - // registry - while (!toStop) { - try { - RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), appname, address); - for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { - try { - Response registryResult = adminBiz.registry(registryParam); - if (registryResult!=null && registryResult.isSuccess()) { - registryResult = Response.ofSuccess(); - logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); - break; - } else { - logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); - } - } catch (Throwable e) { - logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); - } - - } - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - - } - - try { - if (!toStop) { - TimeUnit.SECONDS.sleep(Const.BEAT_TIMEOUT); - } - } catch (Throwable e) { - if (!toStop) { - logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); - } - } - } - - // registry remove - try { - RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), appname, address); - for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { - try { - Response registryResult = adminBiz.registryRemove(registryParam); - if (registryResult!=null && registryResult.isSuccess()) { - registryResult = Response.ofSuccess(); - logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); - break; - } else { - logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); - } - } catch (Throwable e) { - if (!toStop) { - logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e); - } - - } - - } - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy."); - - } - }); - registryThread.setDaemon(true); - registryThread.setName("xxl-job, executor ExecutorRegistryThread"); - registryThread.start(); - } - - public void toStop() { - toStop = true; - - // interrupt and wait - if (registryThread != null) { - registryThread.interrupt(); - try { - registryThread.join(); - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - } - - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThreadHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThreadHelper.java new file mode 100644 index 00000000..f009e8e3 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThreadHelper.java @@ -0,0 +1,107 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.constant.RegistType; +import com.xxl.job.core.openapi.AdminBiz; +import com.xxl.job.core.openapi.model.RegistryRequest; +import com.xxl.job.core.constant.Const; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.tool.concurrent.CyclicThread; +import com.xxl.tool.core.CollectionTool; +import com.xxl.tool.core.StringTool; +import com.xxl.tool.response.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by xuxueli on 17/3/2. + */ +public class ExecutorRegistryThreadHelper { + private static final Logger logger = LoggerFactory.getLogger(ExecutorRegistryThreadHelper.class); + + + /** + * registry thread + */ + private CyclicThread registryThread; + + /** + * start + */ + public void start(final XxlJobExecutor xxlJobExecutor){ + + /** + * valid + */ + if (StringTool.isBlank(xxlJobExecutor.getAppname())) { + logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null."); + return; + } + if (CollectionTool.isEmpty(xxlJobExecutor.getAdminBizList())) { + logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); + return; + } + + + /** + * registry thread + */ + registryThread = new CyclicThread("ExecutorRegistryThread#registryThread", true, new Runnable() { + @Override + public void run() { + RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), xxlJobExecutor.getAppname(), xxlJobExecutor.getAddress()); + for (AdminBiz adminBiz: xxlJobExecutor.getAdminBizList()) { + try { + Response registryResult = adminBiz.registry(registryParam); + if (registryResult!=null && registryResult.isSuccess()) { + registryResult = Response.ofSuccess(); + logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + break; + } else { + logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + } + } catch (Throwable e) { + logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); + } + + } + } + }, Const.BEAT_TIMEOUT * 1000L, true); + registryThread.start(); + + } + + /** + * stop + */ + public void stop(final XxlJobExecutor xxlJobExecutor) { + + /** + * 1、stop registryThread + */ + registryThread.stop(); + + /** + * 2、registry remove + */ + registryRemove(xxlJobExecutor); + } + + private void registryRemove(final XxlJobExecutor xxlJobExecutor){ + RegistryRequest registryParam = new RegistryRequest(RegistType.EXECUTOR.name(), xxlJobExecutor.getAppname(), xxlJobExecutor.getAddress()); + for (AdminBiz adminBiz: xxlJobExecutor.getAdminBizList()) { + try { + Response registryResult = adminBiz.registryRemove(registryParam); + if (registryResult!=null && registryResult.isSuccess()) { + registryResult = Response.ofSuccess(); + logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + break; + } else { + logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); + } + } catch (Throwable e) { + logger.warn(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}, error:{}", registryParam, e.getMessage()); + } + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java deleted file mode 100644 index a42d5f01..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.xxl.job.core.thread; - -import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.tool.core.DateTool; -import com.xxl.tool.io.FileTool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - -/** - * job file clean thread - * - * @author xuxueli 2017-12-29 16:23:43 - */ -public class JobLogFileCleanThread { - private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class); - - private static JobLogFileCleanThread instance = new JobLogFileCleanThread(); - public static JobLogFileCleanThread getInstance(){ - return instance; - } - - private Thread localThread; - private volatile boolean toStop = false; - public void start(final long logRetentionDays){ - - // limit min value - if (logRetentionDays < 3 ) { - return; // effective only when logRetentionDays >= 3 - } - - toStop = false; - localThread = new Thread(new Runnable() { - @Override - public void run() { - while (!toStop) { - try { - // clean log dir, over logRetentionDays - File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles(); - if (childDirs!=null && childDirs.length>0) { - - // today - Calendar todayCal = Calendar.getInstance(); - todayCal.set(Calendar.HOUR_OF_DAY,0); - todayCal.set(Calendar.MINUTE,0); - todayCal.set(Calendar.SECOND,0); - todayCal.set(Calendar.MILLISECOND,0); - - Date todayDate = todayCal.getTime(); - - // clean expired logfile - for (File childFile: childDirs) { - - // valid log-path: must be directory - if (!childFile.isDirectory()) { - continue; - } - - // valid day log-path: like "---/2017-12-25/639.log" - if (!childFile.getName().contains("-")) { - continue; - } - - // parse create-day of file-path - Date logFileCreateDate = null; - try { - logFileCreateDate = DateTool.parseDate(childFile.getName()); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - if (logFileCreateDate == null) { - continue; - } - - // check expired - Date expiredDate = DateTool.addDays(logFileCreateDate, logRetentionDays); - if (todayDate.getTime() > expiredDate.getTime()) { - // expired, remove all log of this day - FileTool.delete(childFile); - //FileUtil.deleteRecursively(childFile); - } - } - } - - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - try { - TimeUnit.DAYS.sleep(1); - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - } - logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy."); - - } - }); - localThread.setDaemon(true); - localThread.setName("xxl-job, executor JobLogFileCleanThread"); - localThread.start(); - } - - public void toStop() { - toStop = true; - - if (localThread == null) { - return; - } - - // interrupt and wait - localThread.interrupt(); - try { - localThread.join(); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThreadHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThreadHelper.java new file mode 100644 index 00000000..a960a0c1 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThreadHelper.java @@ -0,0 +1,108 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.tool.concurrent.CyclicThread; +import com.xxl.tool.core.DateTool; +import com.xxl.tool.io.FileTool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Calendar; +import java.util.Date; + +/** + * job file clean thread + * + * @author xuxueli 2017-12-29 16:23:43 + */ +public class JobLogFileCleanThreadHelper { + private static final Logger logger = LoggerFactory.getLogger(JobLogFileCleanThreadHelper.class); + + + /** + * monitor thread + */ + private CyclicThread logFileCleanThread; + + + /** + * start + */ + public void start(final long logRetentionDays){ + + /** + * limit min value + */ + if (logRetentionDays < 3 ) { + return; // effective only when logRetentionDays >= 3 + } + + /** + * logFileCleanThread + */ + logFileCleanThread = new CyclicThread("JobLogFileCleanThreadHelper#logFileCleanThread", true, new Runnable() { + @Override + public void run() { + // clean log dir, over logRetentionDays + File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles(); + if (childDirs!=null && childDirs.length>0) { + + // today + Calendar todayCal = Calendar.getInstance(); + todayCal.set(Calendar.HOUR_OF_DAY,0); + todayCal.set(Calendar.MINUTE,0); + todayCal.set(Calendar.SECOND,0); + todayCal.set(Calendar.MILLISECOND,0); + + Date todayDate = todayCal.getTime(); + + // clean expired logfile + for (File childFile: childDirs) { + + // valid log-path: must be directory + if (!childFile.isDirectory()) { + continue; + } + + // valid day log-path: like "---/2017-12-25/639.log" + if (!childFile.getName().contains("-")) { + continue; + } + + // parse create-day of file-path + Date logFileCreateDate = null; + try { + logFileCreateDate = DateTool.parseDate(childFile.getName()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + if (logFileCreateDate == null) { + continue; + } + + // check expired + Date expiredDate = DateTool.addDays(logFileCreateDate, logRetentionDays); + if (todayDate.getTime() > expiredDate.getTime()) { + // expired, remove all log of this day + FileTool.delete(childFile); + //FileUtil.deleteRecursively(childFile); + } + } + } + } + }, DateTool.MILLIS_PER_DAY, true); + logFileCleanThread.start(); + + } + + /** + * stop + */ + public void stop() { + + // stop logFileCleanThread + logFileCleanThread.stop(); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index b6028f02..5ea67143 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -181,7 +181,7 @@ public class JobThread extends Thread{ } else { if (idleTimes > 30) { if(triggerQueue.isEmpty()) { // avoid concurrent trigger causes jobId-lost - XxlJobExecutor.removeJobThread(jobId, "excutor idle times over limit."); + XxlJobExecutor.getInstance().removeJobThread(jobId, "excutor idle times over limit."); } } } @@ -203,7 +203,7 @@ public class JobThread extends Thread{ // callback handler info if (!toStop) { // common - TriggerCallbackThread.pushCallBack(new CallbackRequest( + XxlJobExecutor.getInstance().getTriggerCallbackThreadHelper().pushCallBack(new CallbackRequest( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), @@ -211,7 +211,7 @@ public class JobThread extends Thread{ ); } else { // is killed - TriggerCallbackThread.pushCallBack(new CallbackRequest( + XxlJobExecutor.getInstance().getTriggerCallbackThreadHelper().pushCallBack(new CallbackRequest( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, @@ -227,7 +227,7 @@ public class JobThread extends Thread{ TriggerRequest triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed - TriggerCallbackThread.pushCallBack(new CallbackRequest( + XxlJobExecutor.getInstance().getTriggerCallbackThreadHelper().pushCallBack(new CallbackRequest( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL, diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java deleted file mode 100644 index 6c91399e..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ /dev/null @@ -1,305 +0,0 @@ -package com.xxl.job.core.thread; - -import com.xxl.job.core.openapi.AdminBiz; -import com.xxl.job.core.openapi.model.CallbackRequest; -import com.xxl.job.core.context.XxlJobContext; -import com.xxl.job.core.context.XxlJobHelper; -import com.xxl.job.core.constant.Const; -import com.xxl.job.core.executor.XxlJobExecutor; -import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.tool.core.ArrayTool; -import com.xxl.tool.core.CollectionTool; -import com.xxl.tool.core.StringTool; -import com.xxl.tool.crypto.Md5Tool; -import com.xxl.tool.json.GsonTool; -import com.xxl.tool.io.FileTool; -import com.xxl.tool.response.Response; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Trigger Callback Thread - * - * Created by xuxueli on 16/7/22. - */ -public class TriggerCallbackThread { - private static final Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); - - private static final TriggerCallbackThread instance = new TriggerCallbackThread(); - public static TriggerCallbackThread getInstance(){ - return instance; - } - - /** - * job results callback queue - */ - private final LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue<>(); - public static void pushCallBack(CallbackRequest callback){ - getInstance().callBackQueue.add(callback); - logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); - } - - /** - * callback thread - */ - private Thread triggerCallbackThread; - private Thread triggerRetryCallbackThread; - private volatile boolean toStop = false; - public void start() { - - // valid - if (XxlJobExecutor.getAdminBizList() == null) { - logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); - return; - } - - - toStop = false; - - /** - * trigger callback thread - */ - - triggerCallbackThread = new Thread(new Runnable() { - - @Override - public void run() { - - // normal callback - while(!toStop){ - try { - CallbackRequest callback = getInstance().callBackQueue.take(); - if (callback != null) { - - // collect callback data - List callbackParamList = new ArrayList<>(); - callbackParamList.add(callback); // add one element - int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); // drainTo other all elements - - // do callback, will retry if error - if (CollectionTool.isNotEmpty(callbackParamList)) { - doCallback(callbackParamList); - } - } - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - } - - // thead stop, callback lasttime - try { - // collect callback data - List callbackParamList = new ArrayList<>(); - int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); - - // do callback - if (CollectionTool.isNotEmpty(callbackParamList)) { - doCallback(callbackParamList); - } - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy."); - - } - }); - triggerCallbackThread.setDaemon(true); - triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); - triggerCallbackThread.start(); - - - /** - * callback fail retry thread - */ - triggerRetryCallbackThread = new Thread(new Runnable() { - @Override - public void run() { - while(!toStop){ - try { - retryFailCallbackFile(); - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - - } - try { - TimeUnit.SECONDS.sleep(Const.BEAT_TIMEOUT); - } catch (Throwable e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - } - logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy."); - } - }); - triggerRetryCallbackThread.setDaemon(true); - triggerRetryCallbackThread.setName("xxl-job, executor TriggerRetryCallbackThread"); - triggerRetryCallbackThread.start(); - - } - public void toStop(){ - toStop = true; - // stop callback, interrupt and wait - if (triggerCallbackThread != null) { // support empty admin address - triggerCallbackThread.interrupt(); - try { - triggerCallbackThread.join(); - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - } - - // stop retry, interrupt and wait - if (triggerRetryCallbackThread != null) { - triggerRetryCallbackThread.interrupt(); - try { - triggerRetryCallbackThread.join(); - } catch (Throwable e) { - logger.error(e.getMessage(), e); - } - } - - } - - /** - * do callback, will retry if error - * - * @param callbackParamList callback param list - */ - private void doCallback(List callbackParamList){ - boolean callbackRet = false; - // callback, will retry if error - for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { - try { - Response callbackResult = adminBiz.callback(callbackParamList); - if (callbackResult!=null && callbackResult.isSuccess()) { - callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); - callbackRet = true; - break; - } else { - callbackLog(callbackParamList, "
----------- xxl-job job callback fail, callbackResult:" + callbackResult); - } - } catch (Throwable e) { - callbackLog(callbackParamList, "
----------- xxl-job job callback error, errorMsg:" + e.getMessage()); - } - } - if (!callbackRet) { - appendFailCallbackFile(callbackParamList); - } - } - - /** - * callback log - */ - private void callbackLog(List callbackParamList, String logContent){ - for (CallbackRequest callbackParam: callbackParamList) { - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId()); - XxlJobContext.setXxlJobContext(new XxlJobContext( - -1, - null, - -1, - -1, - logFileName, - -1, - -1)); - XxlJobHelper.log(logContent); - } - } - - - // ---------------------- fail-callback file ---------------------- - - /** - * fail-callback file name - */ - private static final String failCallbackFileName = XxlJobFileAppender - .getCallbackLogPath() - .concat(File.separator) - .concat("xxl-job-callback-{x}") - .concat(".log"); - - /** - * append fail-callback file - * - * @param callbackParamList callback param list - */ - private void appendFailCallbackFile(List callbackParamList) { - // valid - if (CollectionTool.isEmpty(callbackParamList)) { - return; - } - - // generate callback data - String callbackData = GsonTool.toJson(callbackParamList); - String callbackDataMd5 = Md5Tool.md5(callbackData); - - - // create file - String finalLogFileName = failCallbackFileName.replace("{x}", callbackDataMd5); - - // write callback log - try { - FileTool.writeString(finalLogFileName, callbackData); - } catch (IOException e) { - logger.error(">>>>>>>>>>> TriggerCallbackThread appendFailCallbackFile error, finalLogFileName:{}", finalLogFileName, e); - } - } - - /** - * retry fail-callback file - */ - private void retryFailCallbackFile() { - - // valid - File callbackLogPath = new File(XxlJobFileAppender.getCallbackLogPath()); - if (!callbackLogPath.exists()) { - return; - } - // valid file type: must be directory - if (!FileTool.isDirectory(callbackLogPath)) { - FileTool.delete(callbackLogPath); - return; - } - // valid file in path: pass if empty - if (ArrayTool.isEmpty(callbackLogPath.listFiles())) { - return; - } - - // load and clear file, do retry - for (File callbackLogFile: callbackLogPath.listFiles()) { - try { - // load data - String callbackData = FileTool.readString(callbackLogFile.getPath()); - if (StringTool.isBlank(callbackData)) { - FileTool.delete(callbackLogFile); - continue; - } - - // parse callback param - List callbackParamList = GsonTool.fromJsonList(callbackData, CallbackRequest.class); - FileTool.delete(callbackLogFile); - - // retry callback - doCallback(callbackParamList); - } catch (IOException e) { - logger.error(">>>>>>>>>>> TriggerCallbackThread retryFailCallbackFile error, callbackLogFile:{}", callbackLogFile.getPath(), e); - } - } - - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThreadHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThreadHelper.java new file mode 100644 index 00000000..7a039c92 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThreadHelper.java @@ -0,0 +1,237 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.constant.Const; +import com.xxl.job.core.context.XxlJobContext; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.openapi.AdminBiz; +import com.xxl.job.core.openapi.model.CallbackRequest; +import com.xxl.tool.concurrent.CyclicThread; +import com.xxl.tool.concurrent.MessageQueue; +import com.xxl.tool.core.ArrayTool; +import com.xxl.tool.core.CollectionTool; +import com.xxl.tool.core.StringTool; +import com.xxl.tool.crypto.Md5Tool; +import com.xxl.tool.io.FileTool; +import com.xxl.tool.json.GsonTool; +import com.xxl.tool.response.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +/** + * Trigger Callback Thread + * + * Created by xuxueli on 16/7/22. + */ +public class TriggerCallbackThreadHelper { + private static final Logger logger = LoggerFactory.getLogger(TriggerCallbackThreadHelper.class); + + + /** + * callback message-queue + */ + private volatile MessageQueue callbackMessageQueue; + + /** + * retry callback-file thread + */ + private CyclicThread retryCallbackThread; + + + /** + * start + */ + public void start(final XxlJobExecutor xxlJobExecutor) { + + // valid + if (xxlJobExecutor.getAdminBizList() == null) { + logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); + return; + } + + + /** + * 1、callback message-queue + */ + callbackMessageQueue = new MessageQueue( + "TriggerCallbackThreadHelper#callbackMessageQueue", + messages -> { + + // do callback + doCallback(messages, xxlJobExecutor); + }, + 1, + 50); + + + /** + * 2、retry callback-file thread + */ + retryCallbackThread = new CyclicThread("TriggerCallbackThreadHelper#retryCallbackThread", true, new Runnable() { + @Override + public void run() { + + // valid empty path + File callbackLogPath = new File(XxlJobFileAppender.getCallbackLogPath()); + if (!callbackLogPath.exists()) { + return; + } + // valid file type: must be directory + if (!FileTool.isDirectory(callbackLogPath)) { + FileTool.delete(callbackLogPath); + return; + } + // valid none file + if (ArrayTool.isEmpty(callbackLogPath.listFiles())) { + return; + } + + // load and clear file, do retry + for (File callbackLogFile: callbackLogPath.listFiles()) { + try { + // load data + String callbackData = FileTool.readString(callbackLogFile.getPath()); + if (StringTool.isBlank(callbackData)) { + FileTool.delete(callbackLogFile); + continue; + } + + // parse callback param + List callbackParamList = GsonTool.fromJsonList(callbackData, CallbackRequest.class); + FileTool.delete(callbackLogFile); + + // retry callback + doCallback(callbackParamList, xxlJobExecutor); + } catch (IOException e) { + logger.error(">>>>>>>>>>> TriggerCallbackThread retryFailCallbackFile error, callbackLogFile:{}", callbackLogFile.getPath(), e); + } + } + + } + }, Const.BEAT_TIMEOUT * 1000L, true); + retryCallbackThread.start(); + } + + /** + * stop + */ + public void stop(){ + // 1、stop callbackMessageQueue + if (callbackMessageQueue != null) { + callbackMessageQueue.stop(); // attempt wait for callback finish + } + + // 2、stop retryCallbackThread + retryCallbackThread.stop(); + } + + + /** + * submit callback message + */ + public void pushCallBack(CallbackRequest callback){ + if (!callbackMessageQueue.produce(callback)) { + doCallback(new ArrayList<>(Collections.singletonList(callback)), XxlJobExecutor.getInstance()); + } + logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); + } + + + // ---------------------- do callback ---------------------- + + /** + * do callback, will retry if error + * + * @param callbackParamList callback param list + */ + private void doCallback(List callbackParamList, final XxlJobExecutor xxlJobExecutor){ + boolean callbackRet = false; + + // callback request, will retry + append-log if fail + for (AdminBiz adminBiz: xxlJobExecutor.getAdminBizList()) { + try { + Response callbackResult = adminBiz.callback(callbackParamList); + if (callbackResult!=null && callbackResult.isSuccess()) { + appendCallbackResult(callbackParamList, "
----------- xxl-job job callback finish."); + callbackRet = true; + break; + } else { + appendCallbackResult(callbackParamList, "
----------- xxl-job job callback fail, callbackResult:" + callbackResult); + } + } catch (Throwable e) { + appendCallbackResult(callbackParamList, "
----------- xxl-job job callback error, errorMsg:" + e.getMessage()); + } + } + + // write callback-file, will retry later + if (!callbackRet) { + writeCallbackLog(callbackParamList); + } + } + + /** + * append callback result, to each joblog + */ + private void appendCallbackResult(List callbackParamList, String logContent){ + for (CallbackRequest callbackParam: callbackParamList) { + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId()); + XxlJobContext.setXxlJobContext(new XxlJobContext( + -1, + null, + -1, + -1, + logFileName, + -1, + -1)); + XxlJobHelper.log(logContent); + } + } + + + // ---------------------- fail-callback file ---------------------- + + /** + * fail-callback file name + */ + private static final String failCallbackFileName = XxlJobFileAppender + .getCallbackLogPath() + .concat(File.separator) + .concat("xxl-job-callback-{x}") + .concat(".log"); + + /** + * write fail-callback file, will retry later + * + * @param callbackParamList callback param list + */ + private void writeCallbackLog(List callbackParamList) { + // valid + if (CollectionTool.isEmpty(callbackParamList)) { + return; + } + + // generate callback data + String callbackData = GsonTool.toJson(callbackParamList); + String callbackDataMd5 = Md5Tool.md5(callbackData); + + + // create file + String finalLogFileName = failCallbackFileName.replace("{x}", callbackDataMd5); + + // write callback log + try { + FileTool.writeString(finalLogFileName, callbackData); + } catch (IOException e) { + logger.error(">>>>>>>>>>> TriggerCallbackThread appendFailCallbackFile error, finalLogFileName:{}", finalLogFileName, e); + } + } + +}