From 1a4c8e5e865ffab0febe4eee143b179118aa4087 Mon Sep 17 00:00:00 2001 From: liyong Date: Wed, 17 Nov 2021 22:28:23 +0800 Subject: [PATCH 01/11] 1 --- xxl-job-admin/src/main/resources/application.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index afe93b42..c9ef4bfa 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://47.97.249.124:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=root_pwd +spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool From a61ba7f3a77f6f544a9ad0161afd4dbb8c33f9dc Mon Sep 17 00:00:00 2001 From: liyong Date: Sun, 21 Nov 2021 21:15:14 +0800 Subject: [PATCH 02/11] 1 --- .../com/xxl/job/core/biz/impl/ExecutorBizImpl.java | 8 +++++++- .../com/xxl/job/core/executor/XxlJobExecutor.java | 14 ++++++++++++-- .../core/executor/impl/XxlJobSpringExecutor.java | 10 +++++++++- .../com/xxl/job/core/log/XxlJobFileAppender.java | 5 +++++ .../job/core/thread/ExecutorRegistryThread.java | 4 ++-- .../xxl/job/core/thread/JobLogFileCleanThread.java | 11 ++++++++--- .../java/com/xxl/job/core/thread/JobThread.java | 8 +++++++- .../xxl/job/core/thread/TriggerCallbackThread.java | 7 +++++++ .../src/main/resources/application.properties | 2 +- 9 files changed, 58 insertions(+), 11 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 8bdf7093..a8762891 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -46,6 +46,7 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public ReturnT run(TriggerParam triggerParam) { // load old:jobHandler + jobThread + //优先获取缓存中的处理器 (可以自定义设置) JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; @@ -54,7 +55,7 @@ public class ExecutorBizImpl implements ExecutorBiz { GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { - // new jobhandler + // new jobhandler 通过解析对象时包装的map获取 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread @@ -120,13 +121,16 @@ public class ExecutorBizImpl implements ExecutorBiz { // executor block strategy if (jobThread != null) { + //阻塞处理策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + //丢弃后续调用 // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + //覆盖之前调用 // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); @@ -134,12 +138,14 @@ public class ExecutorBizImpl implements ExecutorBiz { jobThread = null; } } else { + //单机串行 // just queue trigger } } // replace thread (new or exists invalid) if (jobThread == null) { + //开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 70a2dad9..a1aa8a55 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -74,7 +74,7 @@ public class XxlJobExecutor { initAdminBizList(adminAddresses, accessToken); - // init JobLogFileCleanThread + // init JobLogFileCleanThread 开启一个线程清理日志 JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread @@ -116,6 +116,13 @@ public class XxlJobExecutor { // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; + + /** + * 初始化xxl 服务地址 解析包装成 AdminBizClient 存到 adminBizList内 (地址支持多个以,隔开) + * @param adminAddresses + * @param accessToken + * @throws Exception + */ private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { @@ -206,7 +213,7 @@ public class XxlJobExecutor { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT execute(String param) \" ."); }*/ - + //TODO 设置访问权限 说明私有的方法也是支持的 demo待实践 executeMethod.setAccessible(true); // init and destroy @@ -215,6 +222,7 @@ public class XxlJobExecutor { if (xxlJob.init().trim().length() > 0) { try { + //获取初始化方法 initMethod = clazz.getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { @@ -231,6 +239,7 @@ public class XxlJobExecutor { } // registry jobhandler + //注册到map中 registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); } @@ -239,6 +248,7 @@ public class XxlJobExecutor { // ---------------------- job thread repository ---------------------- private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ + //TODO 处理业务 JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java index 3c2a67d5..8f3fb708 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java @@ -28,6 +28,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC // start + + /** + * 所有单例初始化以后调用 + */ @Override public void afterSingletonsInstantiated() { @@ -37,7 +41,7 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC // init JobHandler Repository (for method) initJobHandlerMethodRepository(applicationContext); - // refresh GlueFactory + //TODO 初始化 glueFactory 使用场景待定 应该是执行脚本用的 GlueFactory.refreshInstance(1); // super start @@ -77,6 +81,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC } }*/ + /** + * 扫描有XxlJob 注解的方法 保存到 XxlJobExecutor.jobHandlerRepository(map) + * @param applicationContext + */ private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index ff0585b5..ee1054a4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -30,6 +30,11 @@ public class XxlJobFileAppender { */ private static String logBasePath = "/data/applogs/xxl-job/jobhandler"; private static String glueSrcPath = logBasePath.concat("/gluesource"); + + /** + * 初始化日志路径 会将 glue脚本 保存到日志目录子目录下 + * @param logPath + */ public static void initLogPath(String logPath){ // init if (logPath!=null && logPath.trim().length()>0) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index e43a2a49..def5aa76 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -34,7 +34,7 @@ public class ExecutorRegistryThread { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } - + //开启线程往服务器注册 每隔30秒请求一次 registryThread = new Thread(new Runnable() { @Override public void run() { @@ -76,7 +76,7 @@ public class ExecutorRegistryThread { } } - // registry remove + // registry remove 如果通过stop 中断了 注册过程 则会立刻调用 移除接口 try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java index b5691542..ac4c227e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java @@ -27,9 +27,14 @@ public class JobLogFileCleanThread { private Thread localThread; private volatile boolean toStop = false; + + /** + * 扫描指定路径下 日期格式的 文件夹 校验如果是指定日期之前的文件则删除 处理完了以后就sleep一天 支持 interrupt + * @param logRetentionDays + */ public void start(final long logRetentionDays){ - // limit min value + // limit min value 最小值三天一清理 if (logRetentionDays < 3 ) { return; } @@ -54,7 +59,7 @@ public class JobLogFileCleanThread { for (File childFile: childDirs) { - // valid + // valid 生成的日志文件时日期格式的文件夹 if (!childFile.isDirectory()) { continue; } @@ -73,7 +78,7 @@ public class JobLogFileCleanThread { if (logFileCreateDate == null) { continue; } - + //基于日期校验是不是指定日期之前的文件夹 是则删除 if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { FileUtil.deleteRecursively(childFile); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index cf07a55a..bdc9af75 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -98,8 +98,10 @@ public class JobThread extends Thread{ // init try { + //调用 xxljob注解的 init方法 handler.init(); } catch (Throwable e) { + //TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意 logger.error(e.getMessage(), e); } @@ -126,12 +128,13 @@ public class JobThread extends Thread{ triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); - // init job context + // init job context 包装执行的一些参数 通过 threadlocal 存取 XxlJobContext.setXxlJobContext(xxlJobContext); // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); + //执行超时时间 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; @@ -162,6 +165,7 @@ public class JobThread extends Thread{ futureThread.interrupt(); } } else { + //如果没有执行超时时间的限制 则 直接调用业务方法 // just execute handler.execute(); } @@ -183,6 +187,7 @@ public class JobThread extends Thread{ ); } else { + //每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); @@ -227,6 +232,7 @@ public class JobThread extends Thread{ } // callback trigger request in queue + //TODO 待确认 while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 40acac00..cf8e4efa 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; /** * Created by xuxueli on 16/7/22. */ + public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); @@ -43,6 +44,10 @@ public class TriggerCallbackThread { /** * callback thread */ + /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg callback 干嘛的待确认 + * 一直循环从队列中获取数据 发生给xxl服务器(/api/callback) + * 如果失败了会写入log目录下 triggerRetryCallbackThread 线程会每隔30秒(可配置)重新发送 + */ private Thread triggerCallbackThread; private Thread triggerRetryCallbackThread; private volatile boolean toStop = false; @@ -68,6 +73,7 @@ public class TriggerCallbackThread { // callback list param List callbackParamList = new ArrayList(); + //将队列中所有对象全部取出 int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); @@ -163,6 +169,7 @@ public class TriggerCallbackThread { private void doCallback(List callbackParamList){ boolean callbackRet = false; // callback, will retry if error + //getAdminBizList 所有xxl服务器地址 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT callbackResult = adminBiz.callback(callbackParamList); diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties index e067db4f..35601c5d 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties @@ -16,7 +16,7 @@ xxl.job.accessToken= ### xxl-job executor appname xxl.job.executor.appname=xxl-job-executor-sample ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address= +xxl.job.executor.address=1.1.1.1 ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=9999 From 7ff20dc9298ec69ce85647f918990f8e54ba90a8 Mon Sep 17 00:00:00 2001 From: liyong Date: Tue, 7 Dec 2021 21:59:26 +0800 Subject: [PATCH 03/11] 1 --- .../java/com/xxl/job/core/biz/client/ExecutorBizClient.java | 2 +- .../main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java | 2 +- .../src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java | 2 +- .../com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java index 9f594309..dbbcffa8 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java @@ -6,7 +6,7 @@ import com.xxl.job.core.util.XxlJobRemotingUtil; /** * admin api test - * + * 用于调用服务端接口 * @author xuxueli 2017-07-28 22:14:52 */ public class ExecutorBizClient implements ExecutorBiz { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index a8762891..4a1510bb 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import java.util.Date; -/** +/** 用于处理接收请求 * Created by xuxueli on 17/3/1. */ public class ExecutorBizImpl implements ExecutorBiz { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index a1aa8a55..3a14703c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -180,6 +180,7 @@ public class XxlJobExecutor { // ---------------------- job handler repository ---------------------- + //存储业务对象 key为 xxljob.value private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap(); public static IJobHandler loadJobHandler(String name){ return jobHandlerRepository.get(name); @@ -239,7 +240,6 @@ public class XxlJobExecutor { } // registry jobhandler - //注册到map中 registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java index 53efbb95..33ecf5d3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java @@ -14,6 +14,7 @@ import java.util.Map; /** * xxl-job executor (for frameless) + * 非spring项目 使用 * * @author xuxueli 2020-11-05 */ From d3c615b2f9f034cd1b29c4358e9eb50165115ce2 Mon Sep 17 00:00:00 2001 From: liyong Date: Sun, 12 Dec 2021 17:52:47 +0800 Subject: [PATCH 04/11] 1 --- .../admin/core/complete/XxlJobCompleter.java | 2 ++ .../route/strategy/ExecutorRouteBusyover.java | 1 + .../route/strategy/ExecutorRouteFailover.java | 1 + .../core/route/strategy/ExecutorRouteLFU.java | 1 + .../core/route/strategy/ExecutorRouteLRU.java | 1 + .../admin/core/scheduler/XxlJobScheduler.java | 7 +++++++ .../admin/core/thread/JobFailMonitorHelper.java | 8 ++++++++ .../admin/core/thread/JobRegistryHelper.java | 5 ++++- .../admin/core/thread/JobScheduleHelper.java | 17 +++++++++++++++-- .../admin/core/thread/JobTriggerPoolHelper.java | 1 + .../job/admin/core/trigger/XxlJobTrigger.java | 10 ++++++++++ 11 files changed, 51 insertions(+), 3 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java index 279ad7d1..cd7557d5 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -28,6 +28,7 @@ public class XxlJobCompleter { public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { // finish + //执行字任务 finishJob(xxlJobLog); // text最大64kb 避免长度过长 @@ -36,6 +37,7 @@ public class XxlJobCompleter { } // fresh handle + //更新log表 return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java index 868560fc..0b8bd355 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java @@ -12,6 +12,7 @@ import java.util.List; /** * Created by xuxueli on 17/3/10. + * 通过调用 客户端 idleBeat 接口 */ public class ExecutorRouteBusyover extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java index a2e4c909..ea1e4ade 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java @@ -11,6 +11,7 @@ import java.util.List; /** * Created by xuxueli on 17/3/10. + * 通过调用 客户端 beat接口 返回第一个SUCCESS的 */ public class ExecutorRouteFailover extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java index 9df19726..5151a44c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java @@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentMap; * a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数 * b、LRU(Least Recently Used):最近最久未使用,时间 * + * 统计24小时内调用次数 最少的 每次调用通过map+1记录 如果是第一次则是随机生成调用次数 这个策略主要是当前节点调用次数 不是 被调方的 调用次数 如果是多节点不符合预期结果 * Created by xuxueli on 17/3/10. */ public class ExecutorRouteLFU extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java index 2d540067..ee3d61bb 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java @@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap; * a、LFU(Least Frequently Used):最不经常使用,频率/次数 * b(*)、LRU(Least Recently Used):最近最久未使用,时间 * + * 基于 LinkedHashMap (accessorder=true)的访问顺序 来控制 * Created by xuxueli on 17/3/10. */ public class ExecutorRouteLRU extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index bb2cda8b..ba529f1b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -22,24 +22,31 @@ public class XxlJobScheduler { public void init() throws Exception { // init i18n + // 从配置文件中初始化 ExecutorBlockStrategyEnum title initI18n(); // admin trigger pool start + // 初始化 fastTriggerPool(用于 任务的重试(步骤四) 、slowTriggerPool(用于 任务的重试超过10次的(步骤四)) 线程池 JobTriggerPoolHelper.toStart(); // admin registry monitor run + //1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 JobRegistryHelper.getInstance().start(); // admin fail-monitor run + //开启线程 扫描异常任务 重试机制 JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) + //初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 JobCompleteHelper.getInstance().start(); // admin log report start + //开启 logrThread 统计报表相关 间隔一分钟 JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) + //任务的预处理 scheduleThread ringThread JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 8409d7b3..28d0a530 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -43,30 +43,38 @@ public class JobFailMonitorHelper { for (long failLogId: failLogIds) { // lock log + //这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题 int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } + //查询执行记录及对应的任务信息 XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、fail retry monitor + //校验重试次数 if (log.getExecutorFailRetryCount() > 0) { + //执行任务 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); String retryMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<<
"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); + //更新记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); } // 2、fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info != null) { + //发送告警 不看 boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log); newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } + + //更新告警状态 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 37edfd98..02e412e4 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -63,6 +63,7 @@ public class JobRegistryHelper { if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) + //查询有没有90秒之前注册过的节点 有则直接删除 List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); @@ -70,6 +71,7 @@ public class JobRegistryHelper { // fresh online address (admin/executor) HashMap> appAddressMap = new HashMap>(); + //查询详细90秒内注册过的 List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { @@ -103,7 +105,7 @@ public class JobRegistryHelper { } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); - + //TODO 循环内更新 XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } @@ -113,6 +115,7 @@ public class JobRegistryHelper { } } try { + //间隔30秒 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 831bcf6a..873240cc 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -70,6 +70,7 @@ public class JobScheduleHelper { connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); + //通过这个来控制多节点只有一个节点会执行 low的一笔 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); @@ -77,18 +78,21 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); + //下次执行时间 在5秒内的 List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump + // 当前时间>下次执行时间+5秒 任务触发过期策略 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); + //如果过期策略是立即执行一次则 执行 if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); @@ -96,19 +100,25 @@ public class JobScheduleHelper { } // 2、fresh next + //刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); - } else if (nowTime > jobInfo.getTriggerNextTime()) { + } + //当前时间>下次执行时间 但是控制在5秒内 + else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger + //执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next + //刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again + //如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second @@ -122,7 +132,9 @@ public class JobScheduleHelper { } - } else { + } + //还没到下次执行时间的 缓存到 ringData + else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second @@ -199,6 +211,7 @@ public class JobScheduleHelper { if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; + //随机休眠 没任务则休眠5秒内 有任务则休眠1秒内 TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index 398713dd..f896a25a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -97,6 +97,7 @@ public class JobTriggerPoolHelper { logger.error(e.getMessage(), e); } finally { + //统计1分钟内 调用超过500ms的 貌似没用到 // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 748befc6..0fbf75e1 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -49,6 +49,7 @@ public class XxlJobTrigger { String addressList) { // load data + //###############初始化 任务参数及执行器参数 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); @@ -76,6 +77,9 @@ public class XxlJobTrigger { shardingParam[1] = Integer.valueOf(shardingArr[1]); } } + //############################ + + //路由策略如果是 分片广播 则循环调用 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { @@ -111,7 +115,9 @@ public class XxlJobTrigger { private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param + //获取阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy + //获取路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; @@ -142,6 +148,7 @@ public class XxlJobTrigger { String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { + //分片广播 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); @@ -149,6 +156,7 @@ public class XxlJobTrigger { address = group.getRegistryList().get(0); } } else { + //基于不同的路由策略获取地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); @@ -161,6 +169,7 @@ public class XxlJobTrigger { // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { + //执行任务 调用 客户端 run 接口 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); @@ -193,6 +202,7 @@ public class XxlJobTrigger { //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); + //更新调用记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); From c8c71809c2b3e3ff582d466468d43dc7821fe737 Mon Sep 17 00:00:00 2001 From: liyong Date: Sun, 12 Dec 2021 20:55:33 +0800 Subject: [PATCH 05/11] 1 --- .../com/xxl/job/admin/core/thread/JobScheduleHelper.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 873240cc..db9d6bfc 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -368,6 +368,14 @@ public class JobScheduleHelper { // ---------------------- tools ---------------------- + + /** + * 获取下次执行时间 + * @param jobInfo + * @param fromTime + * @return + * @throws Exception + */ public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null); if (ScheduleTypeEnum.CRON == scheduleTypeEnum) { From ff4eb49d8cc059010b508170ab04116f7bcc51d9 Mon Sep 17 00:00:00 2001 From: liyong Date: Mon, 13 Dec 2021 09:19:30 +0800 Subject: [PATCH 06/11] 1 --- .../src/main/java/com/xxl/job/core/server/EmbedServer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 242d6b63..19b4e151 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -75,6 +75,7 @@ public class EmbedServer { .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL + //执行 .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) @@ -85,7 +86,7 @@ public class EmbedServer { logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); - // start registry + //注册当前节点 startRegistry(appname, address); // wait util stop From 5aa7b240c92953c0bccb4b898399f7cc6c40c48f Mon Sep 17 00:00:00 2001 From: liyong Date: Mon, 13 Dec 2021 14:57:16 +0800 Subject: [PATCH 07/11] 1 --- .../xxl/job/core/biz/impl/ExecutorBizImpl.java | 17 ++++++++++++----- .../xxl/job/core/executor/XxlJobExecutor.java | 9 ++++++--- .../executor/impl/XxlJobSpringExecutor.java | 4 +++- .../com/xxl/job/core/server/EmbedServer.java | 18 ++++++++++++++---- .../core/thread/ExecutorRegistryThread.java | 2 +- .../com/xxl/job/core/thread/JobThread.java | 11 ++++++++--- .../job/core/thread/TriggerCallbackThread.java | 15 +++++++++++---- 7 files changed, 55 insertions(+), 21 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 4a1510bb..55f0245f 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -46,16 +46,18 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public ReturnT run(TriggerParam triggerParam) { // load old:jobHandler + jobThread - //优先获取缓存中的处理器 (可以自定义设置) + //TODO 获取有没有正在执行的线程 JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); + //TODO 最常用的 bean 模式 if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler 通过解析对象时包装的map获取 + //TODO 通过一开始解析的xxljob注解 获取对应的对象 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread @@ -75,7 +77,9 @@ public class ExecutorBizImpl implements ExecutorBiz { } } - } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { + } + //groovy 脚本 忽略 + else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && @@ -98,7 +102,9 @@ public class ExecutorBizImpl implements ExecutorBiz { return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); } } - } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { + } + //其他脚本模式 忽略 + else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && @@ -121,7 +127,7 @@ public class ExecutorBizImpl implements ExecutorBiz { // executor block strategy if (jobThread != null) { - //阻塞处理策略 + //TODO 阻塞处理策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { //丢弃后续调用 @@ -145,11 +151,12 @@ public class ExecutorBizImpl implements ExecutorBiz { // replace thread (new or exists invalid) if (jobThread == null) { - //开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt + //TODO 开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue + //TODO 讲当前任务添加进队列 上一步开启的线程 会扫描这个队列 ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 3a14703c..daba8cec 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -67,20 +67,22 @@ public class XxlJobExecutor { // ---------------------- start + stop ---------------------- public void start() throws Exception { - // init logpath + //TODO 初始化日志路径 XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client + //TODO 初始化服务器地址 initAdminBizList(adminAddresses, accessToken); - // init JobLogFileCleanThread 开启一个线程清理日志 + //TODO init JobLogFileCleanThread 开启一个线程清理日志 JobLogFileCleanThread.getInstance().start(logRetentionDays); - // init TriggerCallbackThread + //TODO init TriggerCallbackThread 调用服务端 /api/callback 反馈任务执行结果 TriggerCallbackThread.getInstance().start(); // init executor-server + //TODO 初始化XXLJOB服务 initEmbedServer(address, ip, port, appname, accessToken); } public void destroy(){ @@ -164,6 +166,7 @@ public class XxlJobExecutor { // start embedServer = new EmbedServer(); + //TODO embedServer.start(address, port, appname, accessToken); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java index 8f3fb708..30134667 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java @@ -39,13 +39,15 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC /*initJobHandlerRepository(applicationContext);*/ // init JobHandler Repository (for method) + //初始化xxljob注解对象 initJobHandlerMethodRepository(applicationContext); - //TODO 初始化 glueFactory 使用场景待定 应该是执行脚本用的 + //初始化 glueFactory 用于执行groovy脚本 GlueFactory.refreshInstance(1); // super start try { + //TODO super.start(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 19b4e151..360a16c3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -43,6 +43,7 @@ public class EmbedServer { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); + //TODO 用于接收服务端请求的线程池 ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, @@ -75,7 +76,7 @@ public class EmbedServer { .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL - //执行 + //TODO 接收请求 .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) @@ -86,7 +87,7 @@ public class EmbedServer { logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); - //注册当前节点 + //TODO 调用 api/registry 注册当前节点 startRegistry(appname, address); // wait util stop @@ -159,11 +160,12 @@ public class EmbedServer { boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); - // invoke + // TODO 接收请求以后立刻提交给另一个线程池 bizThreadPool.execute(new Runnable() { @Override public void run() { // do invoke + //TODO Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json @@ -177,7 +179,7 @@ public class EmbedServer { private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { - // valid + //#################### 校验 if (HttpMethod.POST != httpMethod) { return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); } @@ -189,21 +191,29 @@ public class EmbedServer { && !accessToken.equals(accessTokenReq)) { return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); } + //###################### + + // services mapping try { if ("/beat".equals(uri)) { + //TODO 用于 路由策略 故障转移 return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { + //TODO 用于 路由策略 忙碌转移 IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { + //TODO 执行任务 TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { + //TODO 中止任务 KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { + //TODO 查询日志 LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index def5aa76..3365c1d3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -76,7 +76,7 @@ public class ExecutorRegistryThread { } } - // registry remove 如果通过stop 中断了 注册过程 则会立刻调用 移除接口 + // TODO registry remove 如果当前服务中止 则会立刻调用 移除接口 try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index bdc9af75..ba923c65 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -98,7 +98,7 @@ public class JobThread extends Thread{ // init try { - //调用 xxljob注解的 init方法 + //TODO 调用 xxljob注解的 init方法 handler.init(); } catch (Throwable e) { //TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意 @@ -113,6 +113,7 @@ public class JobThread extends Thread{ TriggerParam triggerParam = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + //TODO triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; @@ -125,7 +126,9 @@ public class JobThread extends Thread{ triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, + //TODO 当前节点序号 triggerParam.getBroadcastIndex(), + //TODO 节点总数 triggerParam.getBroadcastTotal()); // init job context 包装执行的一些参数 通过 threadlocal 存取 @@ -134,7 +137,7 @@ public class JobThread extends Thread{ // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); - //执行超时时间 + //TODO 执行超时时间 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; @@ -187,7 +190,8 @@ public class JobThread extends Thread{ ); } else { - //每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 + //TODO 每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 + //TODO 30这个数字应该可配置 线程存活时间太长了 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); @@ -212,6 +216,7 @@ public class JobThread extends Thread{ // callback handler info if (!toStop) { // commonm + //TODO 填充callBackQueue 队列 用于讲任务执行结果返回给服务端 TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index cf8e4efa..865b5be4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -33,7 +33,7 @@ public class TriggerCallbackThread { } /** - * job results callback queue + * job results callback queue 有序阻塞队列 */ private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); public static void pushCallBack(HandleCallbackParam callback){ @@ -44,7 +44,7 @@ public class TriggerCallbackThread { /** * callback thread */ - /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg callback 干嘛的待确认 + /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg * 一直循环从队列中获取数据 发生给xxl服务器(/api/callback) * 如果失败了会写入log目录下 triggerRetryCallbackThread 线程会每隔30秒(可配置)重新发送 */ @@ -68,17 +68,19 @@ public class TriggerCallbackThread { // normal callback while(!toStop){ try { + //TODO 从队列中取一个 阻塞方法 HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List callbackParamList = new ArrayList(); - //将队列中所有对象全部取出 + //TODO 将队列中所有对象全部取出 int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { + //TODO doCallback(callbackParamList); } } @@ -90,6 +92,7 @@ public class TriggerCallbackThread { } // last callback + //TODO 跳出循环 说明xxljob生命周期结束了 防止有数据还停留在队列中 try { List callbackParamList = new ArrayList(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); @@ -110,7 +113,7 @@ public class TriggerCallbackThread { triggerCallbackThread.start(); - // retry + // TODO 扫描上一个线程 请求异常的数据 重新发送 triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { @@ -172,8 +175,10 @@ public class TriggerCallbackThread { //getAdminBizList 所有xxl服务器地址 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { + //TODO 调用服务端 callback接口 ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + //日志记录 callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); callbackRet = true; break; @@ -185,6 +190,7 @@ public class TriggerCallbackThread { } } if (!callbackRet) { + //TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 appendFailCallbackFile(callbackParamList); } } @@ -258,6 +264,7 @@ public class TriggerCallbackThread { List callbackParamList = (List) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); + //TODO 读取完了 直接删除 callbaclLogFile.delete(); doCallback(callbackParamList); } From 85270a7b3de26c10fde3b1cb5224c2cb38b66d2a Mon Sep 17 00:00:00 2001 From: liyong Date: Mon, 13 Dec 2021 16:54:26 +0800 Subject: [PATCH 08/11] 1 --- .../admin/core/scheduler/XxlJobScheduler.java | 16 +++++++------- .../admin/core/thread/JobCompleteHelper.java | 2 +- .../core/thread/JobFailMonitorHelper.java | 2 +- .../admin/core/thread/JobRegistryHelper.java | 10 +++++---- .../admin/core/thread/JobScheduleHelper.java | 21 +++++++++++-------- .../core/thread/JobTriggerPoolHelper.java | 2 +- .../job/admin/core/trigger/XxlJobTrigger.java | 13 ++++++------ 7 files changed, 37 insertions(+), 29 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index ba529f1b..1ace9b0b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -22,31 +22,33 @@ public class XxlJobScheduler { public void init() throws Exception { // init i18n - // 从配置文件中初始化 ExecutorBlockStrategyEnum title + // TODO 国际化阻塞策略 initI18n(); // admin trigger pool start - // 初始化 fastTriggerPool(用于 任务的重试(步骤四) 、slowTriggerPool(用于 任务的重试超过10次的(步骤四)) 线程池 + // TODO 初始化 fastTriggerPool(用于 执行任务(步骤四、步骤7) 、slowTriggerPool(用于 执行慢任务(步骤四、步骤7)) 线程池 + //TODO 注意最大线程数 后面会用于 计算一次循环从数据库拉取的任务数 JobTriggerPoolHelper.toStart(); // admin registry monitor run - //1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 + //TODO 1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 JobRegistryHelper.getInstance().start(); // admin fail-monitor run - //开启线程 扫描异常任务 重试机制 + //TODO 步骤4 开启线程 扫描异常任务 重试机制 JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - //初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 + //TODO 初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 JobCompleteHelper.getInstance().start(); // admin log report start - //开启 logrThread 统计报表相关 间隔一分钟 + //todo 开启 logrThread 统计报表相关 间隔一分钟 + //TODO 每次统计都是统计一天的 数据量过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的 JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) - //任务的预处理 scheduleThread ringThread + //TODO 步骤7 任务的预处理 scheduleThread ringThread JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java index 5698926a..c3ce7f1a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java @@ -74,7 +74,7 @@ public class JobCompleteHelper { // monitor while (!toStop) { try { - // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + //TODO 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; Date losedTime = DateUtil.addMinutes(new Date(), -10); List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 28d0a530..83f21165 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -53,7 +53,7 @@ public class JobFailMonitorHelper { XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、fail retry monitor - //校验重试次数 + //TODO 校验重试次数 if (log.getExecutorFailRetryCount() > 0) { //执行任务 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 02e412e4..7c487f6b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -59,11 +59,12 @@ public class JobRegistryHelper { while (!toStop) { try { // auto registry group + // List groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) - //查询有没有90秒之前注册过的节点 有则直接删除 + //TODO 查询有没有90秒之前注册过的节点 有则直接删除 List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); @@ -71,10 +72,11 @@ public class JobRegistryHelper { // fresh online address (admin/executor) HashMap> appAddressMap = new HashMap>(); - //查询详细90秒内注册过的 + //TODO 查询 90秒内注册过的 List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { + //TODO if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List registryList = appAddressMap.get(appname); @@ -105,7 +107,7 @@ public class JobRegistryHelper { } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); - //TODO 循环内更新 + //TODO 循环更新 执行器 对应的 注册列表 XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } @@ -115,7 +117,7 @@ public class JobRegistryHelper { } } try { - //间隔30秒 + //TODO 间隔30秒 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index db9d6bfc..906b2306 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -70,7 +70,7 @@ public class JobScheduleHelper { connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); - //通过这个来控制多节点只有一个节点会执行 low的一笔 + //TODO 通过这个来控制多节点只有一个节点会执行 low的一笔 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); @@ -78,14 +78,14 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); - //下次执行时间 在5秒内的 + //TODO 下次执行时间 在5秒内的 整个框架的核心 List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump - // 当前时间>下次执行时间+5秒 任务触发过期策略 + // TODO 当前时间>下次执行时间+5秒 任务触发过期策略 5秒内不算过期 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); @@ -104,27 +104,29 @@ public class JobScheduleHelper { refreshNextValidTime(jobInfo, new Date()); } - //当前时间>下次执行时间 但是控制在5秒内 + //TODO 当前时间>下次执行时间 过期了但是控制在5秒内不算过期 else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger - //执行 + //TODO 执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next - //刷新下次执行时间 + //TODO 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again - //如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData + //TODO 如果任务在运行状态 且 下次执行时间在5秒内 则将 任务 缓存到 ringData if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second + //TODO 获取下次执行时间 是哪一秒 作为缓存的key int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring + //TODO 缓存至 ringData pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next @@ -249,7 +251,8 @@ public class JobScheduleHelper { try { // second data List ringItemData = new ArrayList<>(); - int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; + //TODO 避免处理耗时太长,跨过刻度,向前校验一个刻度; + int nowSecond = Calendar.getInstance().get(Calendar.SECOND); for (int i = 0; i < 2; i++) { List tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { @@ -262,7 +265,7 @@ public class JobScheduleHelper { if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { - // do trigger + //TODO do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index f896a25a..f1090db9 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -97,7 +97,7 @@ public class JobTriggerPoolHelper { logger.error(e.getMessage(), e); } finally { - //统计1分钟内 调用超过500ms的 貌似没用到 + //TODO 统计1分钟内 调用超过500ms的 // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 0fbf75e1..868a2e76 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -79,7 +79,7 @@ public class XxlJobTrigger { } //############################ - //路由策略如果是 分片广播 则循环调用 + //TODO 路由策略如果是 分片广播 则循环调用 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { @@ -90,6 +90,7 @@ public class XxlJobTrigger { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } + //TODO processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } @@ -115,9 +116,9 @@ public class XxlJobTrigger { private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param - //获取阻塞策略 + //TODO 获取阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy - //获取路由策略 + //TODO 获取路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; @@ -156,7 +157,7 @@ public class XxlJobTrigger { address = group.getRegistryList().get(0); } } else { - //基于不同的路由策略获取地址 + //TODO 基于不同的路由策略获取地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); @@ -169,7 +170,7 @@ public class XxlJobTrigger { // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { - //执行任务 调用 客户端 run 接口 + //TODO 执行任务 调用 客户端 run 接口 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); @@ -202,7 +203,7 @@ public class XxlJobTrigger { //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); - //更新调用记录 + //TODO 更新调用记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); From adcde32ba1e1d19ba0923205bedfbdc37556f2a8 Mon Sep 17 00:00:00 2001 From: liyong Date: Tue, 14 Dec 2021 23:29:06 +0800 Subject: [PATCH 09/11] 1 --- doc/note | 4 ++++ .../com/xxl/job/admin/core/scheduler/XxlJobScheduler.java | 2 +- .../com/xxl/job/admin/core/thread/JobFailMonitorHelper.java | 1 + .../com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java | 1 + .../main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java | 2 +- .../main/java/com/xxl/job/core/executor/XxlJobExecutor.java | 3 ++- .../src/main/java/com/xxl/job/core/server/EmbedServer.java | 2 +- .../java/com/xxl/job/core/thread/JobLogFileCleanThread.java | 2 +- .../java/com/xxl/job/core/thread/TriggerCallbackThread.java | 2 +- .../src/main/resources/application.properties | 3 ++- 10 files changed, 15 insertions(+), 7 deletions(-) create mode 100644 doc/note diff --git a/doc/note b/doc/note new file mode 100644 index 00000000..bd7cec84 --- /dev/null +++ b/doc/note @@ -0,0 +1,4 @@ +源码仓库地址 +https://github.com/xuxueli/xxl-job/releases +http://gitee.com/xuxueli0323/xxl-job/releases + diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 1ace9b0b..8d970c45 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -44,7 +44,7 @@ public class XxlJobScheduler { // admin log report start //todo 开启 logrThread 统计报表相关 间隔一分钟 - //TODO 每次统计都是统计一天的 数据量过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的 + //TODO 每次统计都是统计一天的 没做大数据量的限制 过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的 JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 83f21165..0a2a6900 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -44,6 +44,7 @@ public class JobFailMonitorHelper { // lock log //这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题 + //TODO 如果在这一步服务挂了 这批数据就会被下一个线程扫描为失败 int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index f1090db9..c4b14581 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -79,6 +79,7 @@ public class JobTriggerPoolHelper { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); + //慢任务 超过 10次 则调用单独的线程池 if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 55f0245f..6990e651 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -127,7 +127,7 @@ public class ExecutorBizImpl implements ExecutorBiz { // executor block strategy if (jobThread != null) { - //TODO 阻塞处理策略 + //TODO 阻塞处理策略 注意点 这边阻塞策略是在客户端 实现的 如果路由策略 每次执行的节点不一样 就会导致重复执行 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { //丢弃后续调用 diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index daba8cec..9c41e72e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -154,6 +154,7 @@ public class XxlJobExecutor { ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address + //TODO 优先使用address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); @@ -217,7 +218,7 @@ public class XxlJobExecutor { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT execute(String param) \" ."); }*/ - //TODO 设置访问权限 说明私有的方法也是支持的 demo待实践 + //TODO 设置访问权限 说明私有的方法也是支持的 executeMethod.setAccessible(true); // init and destroy diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 360a16c3..eb2a37d5 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -213,7 +213,7 @@ public class EmbedServer { KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { - //TODO 查询日志 + //TODO 查询日志 注意 实时通过客户端查询日志 如果客户端是在容器中每次部署要注意下日志路径 LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java index ac4c227e..e0802080 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java @@ -34,7 +34,7 @@ public class JobLogFileCleanThread { */ public void start(final long logRetentionDays){ - // limit min value 最小值三天一清理 + // TDOD 注意点 最小值三天一清理 if (logRetentionDays < 3 ) { return; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 865b5be4..0d74968e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -190,7 +190,7 @@ public class TriggerCallbackThread { } } if (!callbackRet) { - //TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 + //TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 目前不会有这种场景 appendFailCallbackFile(callbackParamList); } } diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties index 35601c5d..66403471 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties @@ -16,7 +16,8 @@ xxl.job.accessToken= ### xxl-job executor appname xxl.job.executor.appname=xxl-job-executor-sample ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address=1.1.1.1 +### 注册到服务端的地址 与 ip:port2选1 优先address +xxl.job.executor.address= ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=9999 From 584e2f6edcb27c38fa54abd8f2814bcff4f06dbe Mon Sep 17 00:00:00 2001 From: "yong.li07" Date: Tue, 4 Jul 2023 09:16:23 +0800 Subject: [PATCH 10/11] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=A4=9A=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E4=BB=BB=E5=8A=A1=E5=9D=87=E6=91=8A=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/db/tables_xxl_job.sql | 14 +- .../admin/core/conf/XxlJobAdminConfig.java | 54 +++++++ .../job/admin/core/model/XxlJobCluster.java | 26 +++ .../xxl/job/admin/core/model/XxlJobInfo.java | 20 +++ .../admin/core/thread/JobRegistryHelper.java | 31 ++++ .../admin/core/thread/JobScheduleHelper.java | 23 ++- .../xxl/job/admin/dao/XxlJobClusterDao.java | 20 +++ .../com/xxl/job/admin/dao/XxlJobInfoDao.java | 13 +- .../xxl/job/admin/service/JobAllocation.java | 25 +++ .../service/impl/AverageJobAllocation.java | 150 ++++++++++++++++++ .../admin/service/impl/XxlJobServiceImpl.java | 2 + .../src/main/resources/application.properties | 4 + .../mybatis-mapper/XxlJobClusterMapper.xml | 17 ++ .../mybatis-mapper/XxlJobInfoMapper.xml | 41 ++++- .../service/jobhandler/SampleXxlJob.java | 1 + 15 files changed, 436 insertions(+), 5 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java create mode 100644 xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml diff --git a/doc/db/tables_xxl_job.sql b/doc/db/tables_xxl_job.sql index 9002d076..ae61bd0a 100644 --- a/doc/db/tables_xxl_job.sql +++ b/doc/db/tables_xxl_job.sql @@ -32,7 +32,11 @@ CREATE TABLE `xxl_job_info` ( `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', - PRIMARY KEY (`id`) + `host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名', + `lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态:1-改任务锁定中 0-未锁定', + PRIMARY KEY (`id`), + KEY `idx_host_name` (`host_name`), + KEY `idx_lock` (`lock_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( @@ -113,10 +117,18 @@ CREATE TABLE `xxl_job_lock` ( PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE TABLE `xxl_job_cluster`( + `host_name` varchar(100) NOT NULL COMMENT '实例名', + `update_time` datetime DEFAULT NULL comment '更新时间', + PRIMARY KEY (`host_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' ); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); +INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock'); + commit; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java index 380b8a59..698bfad1 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java @@ -1,8 +1,13 @@ package com.xxl.job.admin.core.conf; import com.xxl.job.admin.core.alarm.JobAlarmer; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; +import com.xxl.job.admin.service.impl.AverageJobAllocation; +import com.xxl.job.core.util.IpUtil; +import io.micrometer.core.instrument.util.StringUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; @@ -36,8 +41,11 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { public void afterPropertiesSet() throws Exception { adminConfig = this; + getJobAllocation().init(true); xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); + getJobAllocation().flush(); + } @Override @@ -67,6 +75,16 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Value("${xxl.job.logretentiondays}") private int logretentiondays; + + @Value("${xxl.job.cluster.host.name}") + private String hostName; + + @Value("${server.port}") + private int port; + + @Value("${xxl.job.cluster.enable:false}") + private boolean clusterEnable; + // dao, service @Resource @@ -79,6 +97,9 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private XxlJobGroupDao xxlJobGroupDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; + + @Resource + private XxlJobClusterDao xxlJobClusterDao; @Resource private JavaMailSender mailSender; @Resource @@ -86,6 +107,23 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Resource private JobAlarmer jobAlarmer; + private JobAllocation jobAllocation = defaultJobAllocation; + + public String getHostName() { + return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName; + } + + + private static JobAllocation defaultJobAllocation = new JobAllocation() { + @Override + public void allocation(XxlJobInfo jobInfo) { + } + + @Override + public void init(boolean init) { + } + }; + public String getI18n() { if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) { @@ -155,4 +193,20 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { return jobAlarmer; } + public XxlJobClusterDao getXxlJobClusterDao() { + return xxlJobClusterDao; + } + + public JobAllocation getJobAllocation() { + return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation; + } + + public void setJobAllocation(JobAllocation jobAllocation) { + this.jobAllocation = jobAllocation; + } + + public boolean isClusterEnable() { + return clusterEnable; + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java new file mode 100644 index 00000000..4a7c2fb6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java @@ -0,0 +1,26 @@ +package com.xxl.job.admin.core.model; + +import java.util.Date; + +public class XxlJobCluster { + + private String hostName; + + private Date updateTime; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java index e47b6dc6..0af4f663 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java @@ -42,6 +42,10 @@ public class XxlJobInfo { private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 + private String hostName; // 运行实例名称 + + private int lockStatus; + public int getId() { return id; @@ -234,4 +238,20 @@ public class XxlJobInfo { public void setTriggerNextTime(long triggerNextTime) { this.triggerNextTime = triggerNextTime; } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public int getLockStatus() { + return lockStatus; + } + + public void setLockStatus(int lockStatus) { + this.lockStatus = lockStatus; + } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 37edfd98..2fd76761 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -2,10 +2,12 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; @@ -29,6 +31,13 @@ public class JobRegistryHelper { private Thread registryMonitorThread; private volatile boolean toStop = false; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "cluster host registry"); + } + }); + public void start(){ // for registry or remove @@ -126,6 +135,28 @@ public class JobRegistryHelper { registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName()); + + Date date = new Date(); + + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5)); + + XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo(); + if(jobInfo!=null){ + XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush(); + } + } catch (Exception e) { + logger.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 0, 1000 * 30, TimeUnit.MILLISECONDS); } public void toStop(){ diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 831bcf6a..daba04ff 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -15,6 +15,7 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author xuxueli 2019-05-21 @@ -77,8 +78,25 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); - List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); - if (scheduleList!=null && scheduleList.size()>0) { + List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount + , XxlJobAdminConfig.getAdminConfig().getHostName(), + XxlJobAdminConfig.getAdminConfig().isClusterEnable()); + if (scheduleList!=null && !scheduleList.isEmpty()) { + + if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream() + .map(XxlJobInfo::getId).collect(Collectors.toList())); + + try { + conn.commit(); + } catch (SQLException e) { + if (!scheduleThreadToStop) { + logger.error(e.getMessage(), e); + } + } + } + + // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { @@ -140,6 +158,7 @@ public class JobScheduleHelper { // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { + jobInfo.setLockStatus(0); XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java new file mode 100644 index 00000000..3d93c4e0 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java @@ -0,0 +1,20 @@ +package com.xxl.job.admin.dao; + +import com.xxl.job.admin.core.model.XxlJobCluster; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +/** + * @author liyong + */ +@Mapper +public interface XxlJobClusterDao { + void replace(@Param("ip") String ip); + + List findAll(); + + void delete(@Param("time") Date date); +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java index d640efff..8adaca5b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java @@ -41,9 +41,20 @@ public interface XxlJobInfoDao { public int findAllCount(); - public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize ); + public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize, + @Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable); public int scheduleUpdate(XxlJobInfo xxlJobInfo); + void updateStatusById(@Param("list") List collect); + + List pageById(@Param("id") Integer id); + + void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List v); + + void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init); + + XxlJobInfo findOldClusterInfo(); + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java new file mode 100644 index 00000000..1ad6dc2e --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java @@ -0,0 +1,25 @@ +package com.xxl.job.admin.service; + +import com.xxl.job.admin.core.model.XxlJobInfo; + +/** + * @author liyong + */ +public interface JobAllocation { + + /** + * 计算单个任务的分配 + * @param jobInfo + */ + default void allocation(XxlJobInfo jobInfo){} + + /** + * 用于节点起来的时候 重新计算任务的分配 + */ + default void flush(){} + + /** + * 启动初始化相关动作 + */ + default void init(boolean init){} +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java new file mode 100644 index 00000000..4f8c15d6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java @@ -0,0 +1,150 @@ +package com.xxl.job.admin.service.impl; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.service.JobAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * @author liyong + */ +public class AverageJobAllocation implements JobAllocation { + + private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class); + + + private static int oneMinute = 60; + private static int tenMinutes = oneMinute * 10; + private static int oneHour = tenMinutes * 6; + private static int oneDay = oneHour * 24; + + private Map averageMap = new ConcurrentHashMap<>(); + private Map> ipMap = new ConcurrentHashMap<>(); + + { + // 防止执行频率高的 扎堆在一起 + averageMap.put(oneMinute, new AtomicInteger(0)); + averageMap.put(tenMinutes, new AtomicInteger(0)); + averageMap.put(oneHour, new AtomicInteger(0)); + averageMap.put(oneDay, new AtomicInteger(0)); + averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0)); + } + + @Override + public void allocation(XxlJobInfo jobInfo) { + List all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll(); + List ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000) + .map(XxlJobCluster::getHostName).collect(Collectors.toList()); + + if (ipList.isEmpty()) { + jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName()); + return; + } + + Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis()); + int i = averageMap.get(key).incrementAndGet(); + + jobInfo.setHostName(ipList.get(i % ipList.size())); + + } + + @Override + public void init(boolean init) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init); + } + + @Override + public void flush() { + Connection conn = null; + Boolean connAutoCommit = null; + PreparedStatement preparedStatement = null; + + try { + conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); + connAutoCommit = conn.getAutoCommit(); + conn.setAutoCommit(false); + + preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update"); + preparedStatement.execute(); + + recursion(0, conn); + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job, cluster flush error:{}", e); + } finally { + // commit + if (conn != null) { + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.setAutoCommit(connAutoCommit); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + // close PreparedStatement + if (null != preparedStatement) { + try { + preparedStatement.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + } + + private void recursion(int id, Connection conn) { + List list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id); + if (CollectionUtils.isEmpty(list)) + return; + Map> map = new HashMap<>(); + list.forEach(x -> { + allocation(x); + List ids = map.getOrDefault(x.getHostName(), new ArrayList<>()); + ids.add(x.getId()); + map.put(x.getHostName(), ids); + }); + map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v)); + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + recursion(list.get(list.size() - 1).getId(), conn); + } + + private Integer getKey(long time) { + if (time < oneMinute) { + return oneMinute; + } else if (time < tenMinutes) { + return tenMinutes; + } else if (time < oneHour) { + return oneHour; + } else if (time < oneDay) { + return oneDay; + } else { + return Integer.MAX_VALUE; + } + } + +} + diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 530ee41c..1f5cae91 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.service.impl; +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; @@ -328,6 +329,7 @@ public class XxlJobServiceImpl implements XxlJobService { xxlJobInfo.setTriggerNextTime(nextTriggerTime); xxlJobInfo.setUpdateTime(new Date()); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo); xxlJobInfoDao.update(xxlJobInfo); return ReturnT.SUCCESS; } diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index 8727b6c7..810e4f40 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -63,3 +63,7 @@ xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30 + +### xxl-job cluster +xxl.job.cluster.host.name= +xxl.job.cluster.enable=true diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml new file mode 100644 index 00000000..5bbd8c63 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml @@ -0,0 +1,17 @@ + + + + + + replace into xxl_job_cluster values(#{ip}, now()); + + + delete from xxl_job_cluster where update_time <= #{time} + + + + + \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml index 7b3c3a3e..764282f2 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml @@ -36,6 +36,9 @@ + + + @@ -224,6 +227,10 @@ FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time #{maxNextTime} + + and t.host_name=#{hostName} + and t.lock_status=0 + ORDER BY id ASC LIMIT #{pagesize} @@ -233,8 +240,40 @@ SET trigger_last_time = #{triggerLastTime}, trigger_next_time = #{triggerNextTime}, - trigger_status = #{triggerStatus} + trigger_status = #{triggerStatus}, + lock_status=#{lockStatus} WHERE id = #{id} + + + + update xxl_job_info set lock_status = 1 + where id in + + #{id} + + + + update xxl_job_info set host_name = #{hostName} + where id in + + #{id} + + + + update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE) + set a.lock_status = 0 + where (b.host_name is null + + or a.host_name =#{hostName} + + ) and a.lock_status=1 + \ No newline at end of file diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java index 759d6625..865543ab 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java @@ -36,6 +36,7 @@ public class SampleXxlJob { */ @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { + logger.info(XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { From 68efe5fafa12fd6ce17983c7cacf3cb884f60c2b Mon Sep 17 00:00:00 2001 From: "yong.li07" Date: Tue, 4 Jul 2023 10:27:25 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=9D=87=E6=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/db/tables_xxl_job.sql | 15 +- .../admin/core/conf/XxlJobAdminConfig.java | 54 ++++++- .../job/admin/core/model/XxlJobCluster.java | 26 +++ .../xxl/job/admin/core/model/XxlJobInfo.java | 19 +++ .../admin/core/thread/JobRegistryHelper.java | 38 +++++ .../admin/core/thread/JobScheduleHelper.java | 21 ++- .../core/thread/JobTriggerPoolHelper.java | 4 +- .../xxl/job/admin/dao/XxlJobClusterDao.java | 17 ++ .../com/xxl/job/admin/dao/XxlJobInfoDao.java | 62 +++++--- .../xxl/job/admin/service/JobAllocation.java | 22 +++ .../service/impl/AverageJobAllocation.java | 149 ++++++++++++++++++ .../admin/service/impl/XxlJobServiceImpl.java | 13 +- .../src/main/resources/application.properties | 9 +- xxl-job-admin/src/main/resources/logback.xml | 2 +- .../mybatis-mapper/XxlJobClusterMapper.xml | 17 ++ .../mybatis-mapper/XxlJobInfoMapper.xml | 40 ++++- .../service/jobhandler/SampleXxlJob.java | 2 + .../src/main/resources/application.properties | 6 +- .../src/main/resources/logback.xml | 2 +- 19 files changed, 474 insertions(+), 44 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java create mode 100644 xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml diff --git a/doc/db/tables_xxl_job.sql b/doc/db/tables_xxl_job.sql index a3202128..61851114 100644 --- a/doc/db/tables_xxl_job.sql +++ b/doc/db/tables_xxl_job.sql @@ -29,10 +29,14 @@ CREATE TABLE `xxl_job_info` ( `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注', `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间', `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔', - `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', + `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行 2.预备执行中', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', - PRIMARY KEY (`id`) + `host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名', + `lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态:1-改任务锁定中 0-未锁定', + PRIMARY KEY (`id`), + KEY `idx_host_name` (`host_name`), + KEY `idx_lock` (`lock_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( @@ -113,10 +117,17 @@ CREATE TABLE `xxl_job_lock` ( PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE TABLE `xxl_job_cluster` ( + `host_name` varchar(100) NOT NULL COMMENT '实例名', + `update_time` datetime DEFAULT NULL comment '更新时间', + PRIMARY KEY (`host_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' ); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); +INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock'); commit; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java index 380b8a59..55a25146 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java @@ -1,8 +1,15 @@ package com.xxl.job.admin.core.conf; import com.xxl.job.admin.core.alarm.JobAlarmer; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; +import com.xxl.job.admin.service.impl.AverageJobAllocation; +import com.xxl.job.core.util.IpUtil; +import io.micrometer.core.instrument.util.StringUtils; +import io.netty.util.internal.StringUtil; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; @@ -12,6 +19,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Arrays; +import java.util.List; /** * xxl-job config @@ -23,6 +31,7 @@ import java.util.Arrays; public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private static XxlJobAdminConfig adminConfig = null; + public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } @@ -35,9 +44,10 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Override public void afterPropertiesSet() throws Exception { adminConfig = this; - + getJobAllocation().init(true); xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); + getJobAllocation().flush(); } @Override @@ -67,6 +77,17 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Value("${xxl.job.logretentiondays}") private int logretentiondays; + + @Value("${xxl.job.cluster.host.name}") + private String hostName; + + @Value("${server.port}") + private int port; + + @Value("${xxl.job.cluster.enable:false}") + private boolean clusterEnable; + + // dao, service @Resource @@ -80,12 +101,28 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Resource private XxlJobLogReportDao xxlJobLogReportDao; @Resource + private XxlJobClusterDao xxlJobClusterDao; + @Resource private JavaMailSender mailSender; @Resource private DataSource dataSource; @Resource private JobAlarmer jobAlarmer; + public String getHostName() { + return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName; + } + + private JobAllocation jobAllocation = defaultJobAllocation; + private static JobAllocation defaultJobAllocation = new JobAllocation() { + @Override + public void allocation(XxlJobInfo jobInfo) { + } + + @Override + public void init(boolean init) { + } + }; public String getI18n() { if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) { @@ -155,4 +192,19 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { return jobAlarmer; } + public XxlJobClusterDao getXxlJobClusterDao() { + return xxlJobClusterDao; + } + + public JobAllocation getJobAllocation() { + return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation; + } + + public void setJobAllocation(JobAllocation jobAllocation) { + this.jobAllocation = jobAllocation; + } + + public boolean isClusterEnable() { + return clusterEnable; + } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java new file mode 100644 index 00000000..4a7c2fb6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java @@ -0,0 +1,26 @@ +package com.xxl.job.admin.core.model; + +import java.util.Date; + +public class XxlJobCluster { + + private String hostName; + + private Date updateTime; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java index e47b6dc6..5b38fb45 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java @@ -41,7 +41,9 @@ public class XxlJobInfo { private int triggerStatus; // 调度状态:0-停止,1-运行 private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 + private String hostName; // 运行实例名称 + private int lockStatus; public int getId() { return id; @@ -234,4 +236,21 @@ public class XxlJobInfo { public void setTriggerNextTime(long triggerNextTime) { this.triggerNextTime = triggerNextTime; } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public int getLockStatus() { + return lockStatus; + } + + public void setLockStatus(int lockStatus) { + this.lockStatus = lockStatus; + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 7c487f6b..44c2fbca 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -1,17 +1,23 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.DateUtil; +import com.xxl.job.core.util.IpUtil; +import org.apache.catalina.startup.HostConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; /** * job registry instance @@ -26,9 +32,18 @@ public class JobRegistryHelper { } private ThreadPoolExecutor registryOrRemoveThreadPool = null; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "cluster host registry"); + } + }); + private Thread registryMonitorThread; private volatile boolean toStop = false; + + public void start(){ // for registry or remove @@ -131,6 +146,29 @@ public class JobRegistryHelper { registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName()); + + Date date = new Date(); + + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5)); + + XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo(); + if(jobInfo!=null){ + XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush(); + } + } catch (Exception e) { + logger.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 0, 1000 * 30, TimeUnit.MILLISECONDS); } public void toStop(){ diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 906b2306..35baee67 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -6,6 +6,7 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum; import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.core.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +16,7 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author xuxueli 2019-05-21 @@ -79,8 +81,24 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); //TODO 下次执行时间 在5秒内的 整个框架的核心 - List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); + List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao() + .scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount, XxlJobAdminConfig.getAdminConfig().getHostName(), + XxlJobAdminConfig.getAdminConfig().isClusterEnable()); if (scheduleList!=null && scheduleList.size()>0) { + + if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream() + .map(XxlJobInfo::getId).collect(Collectors.toList())); + + try { + conn.commit(); + } catch (SQLException e) { + if (!scheduleThreadToStop) { + logger.error(e.getMessage(), e); + } + } + } + scheduleList.forEach(x->logger.info(x.getId()+"")); // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { @@ -154,6 +172,7 @@ public class JobScheduleHelper { // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { + jobInfo.setLockStatus(0); XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index c4b14581..f27b1bd5 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -30,7 +30,7 @@ public class JobTriggerPoolHelper { XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(1000), + new LinkedBlockingQueue(10000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -43,7 +43,7 @@ public class JobTriggerPoolHelper { XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(2000), + new LinkedBlockingQueue(20000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java new file mode 100644 index 00000000..8fb5a14a --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java @@ -0,0 +1,17 @@ +package com.xxl.job.admin.dao; + +import com.xxl.job.admin.core.model.XxlJobCluster; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +@Mapper +public interface XxlJobClusterDao { + void replace(@Param("ip") String ip); + + List findAll(); + + void delete(@Param("time") Date date); +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java index d640efff..45564ecc 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java @@ -9,41 +9,53 @@ import java.util.List; /** * job info + * * @author xuxueli 2016-1-12 18:03:45 */ @Mapper public interface XxlJobInfoDao { - public List pageList(@Param("offset") int offset, - @Param("pagesize") int pagesize, - @Param("jobGroup") int jobGroup, - @Param("triggerStatus") int triggerStatus, - @Param("jobDesc") String jobDesc, - @Param("executorHandler") String executorHandler, - @Param("author") String author); - public int pageListCount(@Param("offset") int offset, - @Param("pagesize") int pagesize, - @Param("jobGroup") int jobGroup, - @Param("triggerStatus") int triggerStatus, - @Param("jobDesc") String jobDesc, - @Param("executorHandler") String executorHandler, - @Param("author") String author); - - public int save(XxlJobInfo info); + public List pageList(@Param("offset") int offset, + @Param("pagesize") int pagesize, + @Param("jobGroup") int jobGroup, + @Param("triggerStatus") int triggerStatus, + @Param("jobDesc") String jobDesc, + @Param("executorHandler") String executorHandler, + @Param("author") String author); - public XxlJobInfo loadById(@Param("id") int id); - - public int update(XxlJobInfo xxlJobInfo); - - public int delete(@Param("id") long id); + public int pageListCount(@Param("offset") int offset, + @Param("pagesize") int pagesize, + @Param("jobGroup") int jobGroup, + @Param("triggerStatus") int triggerStatus, + @Param("jobDesc") String jobDesc, + @Param("executorHandler") String executorHandler, + @Param("author") String author); - public List getJobsByGroup(@Param("jobGroup") int jobGroup); + public int save(XxlJobInfo info); - public int findAllCount(); + public XxlJobInfo loadById(@Param("id") int id); - public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize ); + public int update(XxlJobInfo xxlJobInfo); - public int scheduleUpdate(XxlJobInfo xxlJobInfo); + public int delete(@Param("id") long id); + public List getJobsByGroup(@Param("jobGroup") int jobGroup); + public int findAllCount(); + + public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize, + @Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable); + + public int scheduleUpdate(XxlJobInfo xxlJobInfo); + + + void updateStatusById(@Param("list") List collect); + + List pageById(@Param("id") Integer id); + + void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List v); + + void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init); + + XxlJobInfo findOldClusterInfo(); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java new file mode 100644 index 00000000..245d1881 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java @@ -0,0 +1,22 @@ +package com.xxl.job.admin.service; + +import com.xxl.job.admin.core.model.XxlJobInfo; + +public interface JobAllocation { + + /** + * 计算单个任务的分配 + * @param jobInfo + */ + default void allocation(XxlJobInfo jobInfo){} + + /** + * 用于节点起来的时候 重新计算任务的分配 + */ + default void flush(){} + + /** + * 启动初始化相关动作 + */ + default void init(boolean init){} +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java new file mode 100644 index 00000000..42a6e3e6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java @@ -0,0 +1,149 @@ +package com.xxl.job.admin.service.impl; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.service.JobAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * @author liyong + */ +public class AverageJobAllocation implements JobAllocation { + + private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class); + + + private static int oneMinute = 60; + private static int tenMinutes = oneMinute * 10; + private static int oneHour = tenMinutes * 6; + private static int oneDay = oneHour * 24; + + private Map averageMap = new ConcurrentHashMap<>(); + private Map> ipMap = new ConcurrentHashMap<>(); + + { + // 防止执行频率高的 扎堆在一起 + averageMap.put(oneMinute, new AtomicInteger(0)); + averageMap.put(tenMinutes, new AtomicInteger(0)); + averageMap.put(oneHour, new AtomicInteger(0)); + averageMap.put(oneDay, new AtomicInteger(0)); + averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0)); + } + + @Override + public void allocation(XxlJobInfo jobInfo) { + List all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll(); + List ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000) + .map(XxlJobCluster::getHostName).collect(Collectors.toList()); + + if (ipList.isEmpty()) { + jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName()); + return; + } + + Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis()); + int i = averageMap.get(key).incrementAndGet(); + + jobInfo.setHostName(ipList.get(i % ipList.size())); + + } + + @Override + public void init(boolean init) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init); + } + + @Override + public void flush() { + Connection conn = null; + Boolean connAutoCommit = null; + PreparedStatement preparedStatement = null; + + try { + conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); + connAutoCommit = conn.getAutoCommit(); + conn.setAutoCommit(false); + + preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update"); + preparedStatement.execute(); + + recursion(0, conn); + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); + } finally { + // commit + if (conn != null) { + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.setAutoCommit(connAutoCommit); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + // close PreparedStatement + if (null != preparedStatement) { + try { + preparedStatement.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + } + + private void recursion(int id, Connection conn) { + List list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id); + if (CollectionUtils.isEmpty(list)) + return; + Map> map = new HashMap<>(); + list.forEach(x -> { + allocation(x); + List ids = map.getOrDefault(x.getHostName(), new ArrayList<>()); + ids.add(x.getId()); + map.put(x.getHostName(), ids); + }); + map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v)); + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + recursion(list.get(list.size() - 1).getId(), conn); + } + + private Integer getKey(long time) { + if (time < oneMinute) { + return oneMinute; + } else if (time < tenMinutes) { + return tenMinutes; + } else if (time < oneHour) { + return oneHour; + } else if (time < oneDay) { + return oneDay; + } else { + return Integer.MAX_VALUE; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 530ee41c..09022c5a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.service.impl; +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; @@ -10,6 +11,7 @@ import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.thread.JobScheduleHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; import com.xxl.job.admin.service.XxlJobService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; @@ -41,19 +43,19 @@ public class XxlJobServiceImpl implements XxlJobService { private XxlJobLogGlueDao xxlJobLogGlueDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; - + @Override public Map pageList(int start, int length, int jobGroup, int triggerStatus, String jobDesc, String executorHandler, String author) { // page list List list = xxlJobInfoDao.pageList(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author); int list_count = xxlJobInfoDao.pageListCount(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author); - + // package result Map maps = new HashMap(); - maps.put("recordsTotal", list_count); // 总记录数 - maps.put("recordsFiltered", list_count); // 过滤后的总记录数 - maps.put("data", list); // 分页列表 + maps.put("recordsTotal", list_count); // 总记录数 + maps.put("recordsFiltered", list_count); // 过滤后的总记录数 + maps.put("data", list); // 分页列表 return maps; } @@ -328,6 +330,7 @@ public class XxlJobServiceImpl implements XxlJobService { xxlJobInfo.setTriggerNextTime(nextTriggerTime); xxlJobInfo.setUpdateTime(new Date()); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo); xxlJobInfoDao.update(xxlJobInfo); return ReturnT.SUCCESS; } diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index c9ef4bfa..dd2b1d74 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://47.97.249.124:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://localhost/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=123456 +spring.datasource.password=12345678 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool @@ -63,3 +63,8 @@ xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30 + +xxl.job.cluster.host.name= +xxl.job.cluster.enable=true + + diff --git a/xxl-job-admin/src/main/resources/logback.xml b/xxl-job-admin/src/main/resources/logback.xml index d4b08c24..154b272b 100644 --- a/xxl-job-admin/src/main/resources/logback.xml +++ b/xxl-job-admin/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - + diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml new file mode 100644 index 00000000..5bbd8c63 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml @@ -0,0 +1,17 @@ + + + + + + replace into xxl_job_cluster values(#{ip}, now()); + + + delete from xxl_job_cluster where update_time <= #{time} + + + + + \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml index 7b3c3a3e..0f544bca 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml @@ -36,6 +36,8 @@ + + @@ -224,17 +226,53 @@ FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time #{maxNextTime} + + and t.host_name=#{hostName} + and t.lock_status=0 + ORDER BY id ASC LIMIT #{pagesize} + + UPDATE xxl_job_info SET trigger_last_time = #{triggerLastTime}, trigger_next_time = #{triggerNextTime}, - trigger_status = #{triggerStatus} + trigger_status = #{triggerStatus}, + lock_status=#{lockStatus} WHERE id = #{id} + + update xxl_job_info set lock_status = 1 + where id in + + #{id} + + + + update xxl_job_info set host_name = #{hostName} + where id in + + #{id} + + + + update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE) + set a.lock_status = 0 + where (b.host_name is null + + or a.host_name =#{hostName} + + ) and a.lock_status=1 + \ No newline at end of file diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java index 759d6625..a1000951 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java @@ -1,5 +1,6 @@ package com.xxl.job.executor.service.jobhandler; +import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; @@ -36,6 +37,7 @@ public class SampleXxlJob { */ @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { + logger.info(XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties index 66403471..bfd8eabb 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties @@ -1,5 +1,5 @@ # web port -server.port=8081 +server.port=8084 # no web #spring.main.web-environment=false @@ -8,7 +8,7 @@ logging.config=classpath:logback.xml ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin +xxl.job.admin.addresses=http://127.0.0.1:8081/xxl-job-admin ### xxl-job, access token xxl.job.accessToken= @@ -22,6 +22,6 @@ xxl.job.executor.address= xxl.job.executor.ip= xxl.job.executor.port=9999 ### xxl-job executor log-path -xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler +xxl.job.executor.logpath=../data/applogs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30 diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml index d5a0d2ca..89f526de 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - +