From 5aa7b240c92953c0bccb4b898399f7cc6c40c48f Mon Sep 17 00:00:00 2001 From: liyong Date: Mon, 13 Dec 2021 14:57:16 +0800 Subject: [PATCH] 1 --- .../xxl/job/core/biz/impl/ExecutorBizImpl.java | 17 ++++++++++++----- .../xxl/job/core/executor/XxlJobExecutor.java | 9 ++++++--- .../executor/impl/XxlJobSpringExecutor.java | 4 +++- .../com/xxl/job/core/server/EmbedServer.java | 18 ++++++++++++++---- .../core/thread/ExecutorRegistryThread.java | 2 +- .../com/xxl/job/core/thread/JobThread.java | 11 ++++++++--- .../job/core/thread/TriggerCallbackThread.java | 15 +++++++++++---- 7 files changed, 55 insertions(+), 21 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 4a1510bb..55f0245f 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -46,16 +46,18 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public ReturnT run(TriggerParam triggerParam) { // load old:jobHandler + jobThread - //优先获取缓存中的处理器 (可以自定义设置) + //TODO 获取有没有正在执行的线程 JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); + //TODO 最常用的 bean 模式 if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler 通过解析对象时包装的map获取 + //TODO 通过一开始解析的xxljob注解 获取对应的对象 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread @@ -75,7 +77,9 @@ public class ExecutorBizImpl implements ExecutorBiz { } } - } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { + } + //groovy 脚本 忽略 + else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && @@ -98,7 +102,9 @@ public class ExecutorBizImpl implements ExecutorBiz { return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); } } - } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { + } + //其他脚本模式 忽略 + else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && @@ -121,7 +127,7 @@ public class ExecutorBizImpl implements ExecutorBiz { // executor block strategy if (jobThread != null) { - //阻塞处理策略 + //TODO 阻塞处理策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { //丢弃后续调用 @@ -145,11 +151,12 @@ public class ExecutorBizImpl implements ExecutorBiz { // replace thread (new or exists invalid) if (jobThread == null) { - //开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt + //TODO 开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue + //TODO 讲当前任务添加进队列 上一步开启的线程 会扫描这个队列 ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 3a14703c..daba8cec 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -67,20 +67,22 @@ public class XxlJobExecutor { // ---------------------- start + stop ---------------------- public void start() throws Exception { - // init logpath + //TODO 初始化日志路径 XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client + //TODO 初始化服务器地址 initAdminBizList(adminAddresses, accessToken); - // init JobLogFileCleanThread 开启一个线程清理日志 + //TODO init JobLogFileCleanThread 开启一个线程清理日志 JobLogFileCleanThread.getInstance().start(logRetentionDays); - // init TriggerCallbackThread + //TODO init TriggerCallbackThread 调用服务端 /api/callback 反馈任务执行结果 TriggerCallbackThread.getInstance().start(); // init executor-server + //TODO 初始化XXLJOB服务 initEmbedServer(address, ip, port, appname, accessToken); } public void destroy(){ @@ -164,6 +166,7 @@ public class XxlJobExecutor { // start embedServer = new EmbedServer(); + //TODO embedServer.start(address, port, appname, accessToken); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java index 8f3fb708..30134667 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java @@ -39,13 +39,15 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC /*initJobHandlerRepository(applicationContext);*/ // init JobHandler Repository (for method) + //初始化xxljob注解对象 initJobHandlerMethodRepository(applicationContext); - //TODO 初始化 glueFactory 使用场景待定 应该是执行脚本用的 + //初始化 glueFactory 用于执行groovy脚本 GlueFactory.refreshInstance(1); // super start try { + //TODO super.start(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 19b4e151..360a16c3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -43,6 +43,7 @@ public class EmbedServer { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); + //TODO 用于接收服务端请求的线程池 ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, @@ -75,7 +76,7 @@ public class EmbedServer { .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL - //执行 + //TODO 接收请求 .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) @@ -86,7 +87,7 @@ public class EmbedServer { logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); - //注册当前节点 + //TODO 调用 api/registry 注册当前节点 startRegistry(appname, address); // wait util stop @@ -159,11 +160,12 @@ public class EmbedServer { boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); - // invoke + // TODO 接收请求以后立刻提交给另一个线程池 bizThreadPool.execute(new Runnable() { @Override public void run() { // do invoke + //TODO Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json @@ -177,7 +179,7 @@ public class EmbedServer { private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { - // valid + //#################### 校验 if (HttpMethod.POST != httpMethod) { return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); } @@ -189,21 +191,29 @@ public class EmbedServer { && !accessToken.equals(accessTokenReq)) { return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); } + //###################### + + // services mapping try { if ("/beat".equals(uri)) { + //TODO 用于 路由策略 故障转移 return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { + //TODO 用于 路由策略 忙碌转移 IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { + //TODO 执行任务 TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { + //TODO 中止任务 KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { + //TODO 查询日志 LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index def5aa76..3365c1d3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -76,7 +76,7 @@ public class ExecutorRegistryThread { } } - // registry remove 如果通过stop 中断了 注册过程 则会立刻调用 移除接口 + // TODO registry remove 如果当前服务中止 则会立刻调用 移除接口 try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index bdc9af75..ba923c65 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -98,7 +98,7 @@ public class JobThread extends Thread{ // init try { - //调用 xxljob注解的 init方法 + //TODO 调用 xxljob注解的 init方法 handler.init(); } catch (Throwable e) { //TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意 @@ -113,6 +113,7 @@ public class JobThread extends Thread{ TriggerParam triggerParam = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + //TODO triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; @@ -125,7 +126,9 @@ public class JobThread extends Thread{ triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, + //TODO 当前节点序号 triggerParam.getBroadcastIndex(), + //TODO 节点总数 triggerParam.getBroadcastTotal()); // init job context 包装执行的一些参数 通过 threadlocal 存取 @@ -134,7 +137,7 @@ public class JobThread extends Thread{ // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); - //执行超时时间 + //TODO 执行超时时间 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; @@ -187,7 +190,8 @@ public class JobThread extends Thread{ ); } else { - //每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 + //TODO 每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 + //TODO 30这个数字应该可配置 线程存活时间太长了 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); @@ -212,6 +216,7 @@ public class JobThread extends Thread{ // callback handler info if (!toStop) { // commonm + //TODO 填充callBackQueue 队列 用于讲任务执行结果返回给服务端 TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index cf8e4efa..865b5be4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -33,7 +33,7 @@ public class TriggerCallbackThread { } /** - * job results callback queue + * job results callback queue 有序阻塞队列 */ private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); public static void pushCallBack(HandleCallbackParam callback){ @@ -44,7 +44,7 @@ public class TriggerCallbackThread { /** * callback thread */ - /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg callback 干嘛的待确认 + /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg * 一直循环从队列中获取数据 发生给xxl服务器(/api/callback) * 如果失败了会写入log目录下 triggerRetryCallbackThread 线程会每隔30秒(可配置)重新发送 */ @@ -68,17 +68,19 @@ public class TriggerCallbackThread { // normal callback while(!toStop){ try { + //TODO 从队列中取一个 阻塞方法 HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List callbackParamList = new ArrayList(); - //将队列中所有对象全部取出 + //TODO 将队列中所有对象全部取出 int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { + //TODO doCallback(callbackParamList); } } @@ -90,6 +92,7 @@ public class TriggerCallbackThread { } // last callback + //TODO 跳出循环 说明xxljob生命周期结束了 防止有数据还停留在队列中 try { List callbackParamList = new ArrayList(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); @@ -110,7 +113,7 @@ public class TriggerCallbackThread { triggerCallbackThread.start(); - // retry + // TODO 扫描上一个线程 请求异常的数据 重新发送 triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { @@ -172,8 +175,10 @@ public class TriggerCallbackThread { //getAdminBizList 所有xxl服务器地址 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { + //TODO 调用服务端 callback接口 ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + //日志记录 callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); callbackRet = true; break; @@ -185,6 +190,7 @@ public class TriggerCallbackThread { } } if (!callbackRet) { + //TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 appendFailCallbackFile(callbackParamList); } } @@ -258,6 +264,7 @@ public class TriggerCallbackThread { List callbackParamList = (List) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); + //TODO 读取完了 直接删除 callbaclLogFile.delete(); doCallback(callbackParamList); }