diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 8bdf7093..a8762891 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -46,6 +46,7 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public ReturnT run(TriggerParam triggerParam) { // load old:jobHandler + jobThread + //优先获取缓存中的处理器 (可以自定义设置) JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; @@ -54,7 +55,7 @@ public class ExecutorBizImpl implements ExecutorBiz { GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { - // new jobhandler + // new jobhandler 通过解析对象时包装的map获取 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread @@ -120,13 +121,16 @@ public class ExecutorBizImpl implements ExecutorBiz { // executor block strategy if (jobThread != null) { + //阻塞处理策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + //丢弃后续调用 // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + //覆盖之前调用 // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); @@ -134,12 +138,14 @@ public class ExecutorBizImpl implements ExecutorBiz { jobThread = null; } } else { + //单机串行 // just queue trigger } } // replace thread (new or exists invalid) if (jobThread == null) { + //开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 70a2dad9..a1aa8a55 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -74,7 +74,7 @@ public class XxlJobExecutor { initAdminBizList(adminAddresses, accessToken); - // init JobLogFileCleanThread + // init JobLogFileCleanThread 开启一个线程清理日志 JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread @@ -116,6 +116,13 @@ public class XxlJobExecutor { // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; + + /** + * 初始化xxl 服务地址 解析包装成 AdminBizClient 存到 adminBizList内 (地址支持多个以,隔开) + * @param adminAddresses + * @param accessToken + * @throws Exception + */ private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { @@ -206,7 +213,7 @@ public class XxlJobExecutor { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT execute(String param) \" ."); }*/ - + //TODO 设置访问权限 说明私有的方法也是支持的 demo待实践 executeMethod.setAccessible(true); // init and destroy @@ -215,6 +222,7 @@ public class XxlJobExecutor { if (xxlJob.init().trim().length() > 0) { try { + //获取初始化方法 initMethod = clazz.getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { @@ -231,6 +239,7 @@ public class XxlJobExecutor { } // registry jobhandler + //注册到map中 registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); } @@ -239,6 +248,7 @@ public class XxlJobExecutor { // ---------------------- job thread repository ---------------------- private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ + //TODO 处理业务 JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java index 3c2a67d5..8f3fb708 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java @@ -28,6 +28,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC // start + + /** + * 所有单例初始化以后调用 + */ @Override public void afterSingletonsInstantiated() { @@ -37,7 +41,7 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC // init JobHandler Repository (for method) initJobHandlerMethodRepository(applicationContext); - // refresh GlueFactory + //TODO 初始化 glueFactory 使用场景待定 应该是执行脚本用的 GlueFactory.refreshInstance(1); // super start @@ -77,6 +81,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC } }*/ + /** + * 扫描有XxlJob 注解的方法 保存到 XxlJobExecutor.jobHandlerRepository(map) + * @param applicationContext + */ private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index ff0585b5..ee1054a4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -30,6 +30,11 @@ public class XxlJobFileAppender { */ private static String logBasePath = "/data/applogs/xxl-job/jobhandler"; private static String glueSrcPath = logBasePath.concat("/gluesource"); + + /** + * 初始化日志路径 会将 glue脚本 保存到日志目录子目录下 + * @param logPath + */ public static void initLogPath(String logPath){ // init if (logPath!=null && logPath.trim().length()>0) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index e43a2a49..def5aa76 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -34,7 +34,7 @@ public class ExecutorRegistryThread { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } - + //开启线程往服务器注册 每隔30秒请求一次 registryThread = new Thread(new Runnable() { @Override public void run() { @@ -76,7 +76,7 @@ public class ExecutorRegistryThread { } } - // registry remove + // registry remove 如果通过stop 中断了 注册过程 则会立刻调用 移除接口 try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java index b5691542..ac4c227e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java @@ -27,9 +27,14 @@ public class JobLogFileCleanThread { private Thread localThread; private volatile boolean toStop = false; + + /** + * 扫描指定路径下 日期格式的 文件夹 校验如果是指定日期之前的文件则删除 处理完了以后就sleep一天 支持 interrupt + * @param logRetentionDays + */ public void start(final long logRetentionDays){ - // limit min value + // limit min value 最小值三天一清理 if (logRetentionDays < 3 ) { return; } @@ -54,7 +59,7 @@ public class JobLogFileCleanThread { for (File childFile: childDirs) { - // valid + // valid 生成的日志文件时日期格式的文件夹 if (!childFile.isDirectory()) { continue; } @@ -73,7 +78,7 @@ public class JobLogFileCleanThread { if (logFileCreateDate == null) { continue; } - + //基于日期校验是不是指定日期之前的文件夹 是则删除 if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { FileUtil.deleteRecursively(childFile); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index cf07a55a..bdc9af75 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -98,8 +98,10 @@ public class JobThread extends Thread{ // init try { + //调用 xxljob注解的 init方法 handler.init(); } catch (Throwable e) { + //TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意 logger.error(e.getMessage(), e); } @@ -126,12 +128,13 @@ public class JobThread extends Thread{ triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); - // init job context + // init job context 包装执行的一些参数 通过 threadlocal 存取 XxlJobContext.setXxlJobContext(xxlJobContext); // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); + //执行超时时间 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; @@ -162,6 +165,7 @@ public class JobThread extends Thread{ futureThread.interrupt(); } } else { + //如果没有执行超时时间的限制 则 直接调用业务方法 // just execute handler.execute(); } @@ -183,6 +187,7 @@ public class JobThread extends Thread{ ); } else { + //每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); @@ -227,6 +232,7 @@ public class JobThread extends Thread{ } // callback trigger request in queue + //TODO 待确认 while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 40acac00..cf8e4efa 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; /** * Created by xuxueli on 16/7/22. */ + public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); @@ -43,6 +44,10 @@ public class TriggerCallbackThread { /** * callback thread */ + /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg callback 干嘛的待确认 + * 一直循环从队列中获取数据 发生给xxl服务器(/api/callback) + * 如果失败了会写入log目录下 triggerRetryCallbackThread 线程会每隔30秒(可配置)重新发送 + */ private Thread triggerCallbackThread; private Thread triggerRetryCallbackThread; private volatile boolean toStop = false; @@ -68,6 +73,7 @@ public class TriggerCallbackThread { // callback list param List callbackParamList = new ArrayList(); + //将队列中所有对象全部取出 int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); @@ -163,6 +169,7 @@ public class TriggerCallbackThread { private void doCallback(List callbackParamList){ boolean callbackRet = false; // callback, will retry if error + //getAdminBizList 所有xxl服务器地址 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT callbackResult = adminBiz.callback(callbackParamList); diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties index e067db4f..35601c5d 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties @@ -16,7 +16,7 @@ xxl.job.accessToken= ### xxl-job executor appname xxl.job.executor.appname=xxl-job-executor-sample ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address= +xxl.job.executor.address=1.1.1.1 ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=9999