diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index ba529f1b..1ace9b0b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -22,31 +22,33 @@ public class XxlJobScheduler { public void init() throws Exception { // init i18n - // 从配置文件中初始化 ExecutorBlockStrategyEnum title + // TODO 国际化阻塞策略 initI18n(); // admin trigger pool start - // 初始化 fastTriggerPool(用于 任务的重试(步骤四) 、slowTriggerPool(用于 任务的重试超过10次的(步骤四)) 线程池 + // TODO 初始化 fastTriggerPool(用于 执行任务(步骤四、步骤7) 、slowTriggerPool(用于 执行慢任务(步骤四、步骤7)) 线程池 + //TODO 注意最大线程数 后面会用于 计算一次循环从数据库拉取的任务数 JobTriggerPoolHelper.toStart(); // admin registry monitor run - //1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 + //TODO 1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 JobRegistryHelper.getInstance().start(); // admin fail-monitor run - //开启线程 扫描异常任务 重试机制 + //TODO 步骤4 开启线程 扫描异常任务 重试机制 JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - //初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 + //TODO 初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 JobCompleteHelper.getInstance().start(); // admin log report start - //开启 logrThread 统计报表相关 间隔一分钟 + //todo 开启 logrThread 统计报表相关 间隔一分钟 + //TODO 每次统计都是统计一天的 数据量过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的 JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) - //任务的预处理 scheduleThread ringThread + //TODO 步骤7 任务的预处理 scheduleThread ringThread JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java index 5698926a..c3ce7f1a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java @@ -74,7 +74,7 @@ public class JobCompleteHelper { // monitor while (!toStop) { try { - // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + //TODO 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; Date losedTime = DateUtil.addMinutes(new Date(), -10); List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 28d0a530..83f21165 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -53,7 +53,7 @@ public class JobFailMonitorHelper { XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、fail retry monitor - //校验重试次数 + //TODO 校验重试次数 if (log.getExecutorFailRetryCount() > 0) { //执行任务 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 02e412e4..7c487f6b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -59,11 +59,12 @@ public class JobRegistryHelper { while (!toStop) { try { // auto registry group + // List groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) - //查询有没有90秒之前注册过的节点 有则直接删除 + //TODO 查询有没有90秒之前注册过的节点 有则直接删除 List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); @@ -71,10 +72,11 @@ public class JobRegistryHelper { // fresh online address (admin/executor) HashMap> appAddressMap = new HashMap>(); - //查询详细90秒内注册过的 + //TODO 查询 90秒内注册过的 List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { + //TODO if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List registryList = appAddressMap.get(appname); @@ -105,7 +107,7 @@ public class JobRegistryHelper { } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); - //TODO 循环内更新 + //TODO 循环更新 执行器 对应的 注册列表 XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } @@ -115,7 +117,7 @@ public class JobRegistryHelper { } } try { - //间隔30秒 + //TODO 间隔30秒 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index db9d6bfc..906b2306 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -70,7 +70,7 @@ public class JobScheduleHelper { connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); - //通过这个来控制多节点只有一个节点会执行 low的一笔 + //TODO 通过这个来控制多节点只有一个节点会执行 low的一笔 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); @@ -78,14 +78,14 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); - //下次执行时间 在5秒内的 + //TODO 下次执行时间 在5秒内的 整个框架的核心 List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump - // 当前时间>下次执行时间+5秒 任务触发过期策略 + // TODO 当前时间>下次执行时间+5秒 任务触发过期策略 5秒内不算过期 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); @@ -104,27 +104,29 @@ public class JobScheduleHelper { refreshNextValidTime(jobInfo, new Date()); } - //当前时间>下次执行时间 但是控制在5秒内 + //TODO 当前时间>下次执行时间 过期了但是控制在5秒内不算过期 else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger - //执行 + //TODO 执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next - //刷新下次执行时间 + //TODO 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again - //如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData + //TODO 如果任务在运行状态 且 下次执行时间在5秒内 则将 任务 缓存到 ringData if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second + //TODO 获取下次执行时间 是哪一秒 作为缓存的key int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring + //TODO 缓存至 ringData pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next @@ -249,7 +251,8 @@ public class JobScheduleHelper { try { // second data List ringItemData = new ArrayList<>(); - int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; + //TODO 避免处理耗时太长,跨过刻度,向前校验一个刻度; + int nowSecond = Calendar.getInstance().get(Calendar.SECOND); for (int i = 0; i < 2; i++) { List tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { @@ -262,7 +265,7 @@ public class JobScheduleHelper { if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { - // do trigger + //TODO do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index f896a25a..f1090db9 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -97,7 +97,7 @@ public class JobTriggerPoolHelper { logger.error(e.getMessage(), e); } finally { - //统计1分钟内 调用超过500ms的 貌似没用到 + //TODO 统计1分钟内 调用超过500ms的 // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 0fbf75e1..868a2e76 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -79,7 +79,7 @@ public class XxlJobTrigger { } //############################ - //路由策略如果是 分片广播 则循环调用 + //TODO 路由策略如果是 分片广播 则循环调用 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { @@ -90,6 +90,7 @@ public class XxlJobTrigger { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } + //TODO processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } @@ -115,9 +116,9 @@ public class XxlJobTrigger { private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param - //获取阻塞策略 + //TODO 获取阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy - //获取路由策略 + //TODO 获取路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; @@ -156,7 +157,7 @@ public class XxlJobTrigger { address = group.getRegistryList().get(0); } } else { - //基于不同的路由策略获取地址 + //TODO 基于不同的路由策略获取地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); @@ -169,7 +170,7 @@ public class XxlJobTrigger { // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { - //执行任务 调用 客户端 run 接口 + //TODO 执行任务 调用 客户端 run 接口 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); @@ -202,7 +203,7 @@ public class XxlJobTrigger { //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); - //更新调用记录 + //TODO 更新调用记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());