diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java index 279ad7d1..cd7557d5 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -28,6 +28,7 @@ public class XxlJobCompleter { public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { // finish + //执行字任务 finishJob(xxlJobLog); // text最大64kb 避免长度过长 @@ -36,6 +37,7 @@ public class XxlJobCompleter { } // fresh handle + //更新log表 return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java index 868560fc..0b8bd355 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java @@ -12,6 +12,7 @@ import java.util.List; /** * Created by xuxueli on 17/3/10. + * 通过调用 客户端 idleBeat 接口 */ public class ExecutorRouteBusyover extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java index a2e4c909..ea1e4ade 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java @@ -11,6 +11,7 @@ import java.util.List; /** * Created by xuxueli on 17/3/10. + * 通过调用 客户端 beat接口 返回第一个SUCCESS的 */ public class ExecutorRouteFailover extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java index 9df19726..5151a44c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java @@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentMap; * a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数 * b、LRU(Least Recently Used):最近最久未使用,时间 * + * 统计24小时内调用次数 最少的 每次调用通过map+1记录 如果是第一次则是随机生成调用次数 这个策略主要是当前节点调用次数 不是 被调方的 调用次数 如果是多节点不符合预期结果 * Created by xuxueli on 17/3/10. */ public class ExecutorRouteLFU extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java index 2d540067..ee3d61bb 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java @@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap; * a、LFU(Least Frequently Used):最不经常使用,频率/次数 * b(*)、LRU(Least Recently Used):最近最久未使用,时间 * + * 基于 LinkedHashMap (accessorder=true)的访问顺序 来控制 * Created by xuxueli on 17/3/10. */ public class ExecutorRouteLRU extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index bb2cda8b..ba529f1b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -22,24 +22,31 @@ public class XxlJobScheduler { public void init() throws Exception { // init i18n + // 从配置文件中初始化 ExecutorBlockStrategyEnum title initI18n(); // admin trigger pool start + // 初始化 fastTriggerPool(用于 任务的重试(步骤四) 、slowTriggerPool(用于 任务的重试超过10次的(步骤四)) 线程池 JobTriggerPoolHelper.toStart(); // admin registry monitor run + //1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 JobRegistryHelper.getInstance().start(); // admin fail-monitor run + //开启线程 扫描异常任务 重试机制 JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) + //初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 JobCompleteHelper.getInstance().start(); // admin log report start + //开启 logrThread 统计报表相关 间隔一分钟 JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) + //任务的预处理 scheduleThread ringThread JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 8409d7b3..28d0a530 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -43,30 +43,38 @@ public class JobFailMonitorHelper { for (long failLogId: failLogIds) { // lock log + //这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题 int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } + //查询执行记录及对应的任务信息 XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、fail retry monitor + //校验重试次数 if (log.getExecutorFailRetryCount() > 0) { + //执行任务 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); String retryMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<<
"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); + //更新记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); } // 2、fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info != null) { + //发送告警 不看 boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log); newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } + + //更新告警状态 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 37edfd98..02e412e4 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -63,6 +63,7 @@ public class JobRegistryHelper { if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) + //查询有没有90秒之前注册过的节点 有则直接删除 List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); @@ -70,6 +71,7 @@ public class JobRegistryHelper { // fresh online address (admin/executor) HashMap> appAddressMap = new HashMap>(); + //查询详细90秒内注册过的 List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { @@ -103,7 +105,7 @@ public class JobRegistryHelper { } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); - + //TODO 循环内更新 XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } @@ -113,6 +115,7 @@ public class JobRegistryHelper { } } try { + //间隔30秒 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 831bcf6a..db9d6bfc 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -70,6 +70,7 @@ public class JobScheduleHelper { connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); + //通过这个来控制多节点只有一个节点会执行 low的一笔 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); @@ -77,18 +78,21 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); + //下次执行时间 在5秒内的 List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump + // 当前时间>下次执行时间+5秒 任务触发过期策略 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); + //如果过期策略是立即执行一次则 执行 if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); @@ -96,19 +100,25 @@ public class JobScheduleHelper { } // 2、fresh next + //刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); - } else if (nowTime > jobInfo.getTriggerNextTime()) { + } + //当前时间>下次执行时间 但是控制在5秒内 + else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger + //执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next + //刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again + //如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second @@ -122,7 +132,9 @@ public class JobScheduleHelper { } - } else { + } + //还没到下次执行时间的 缓存到 ringData + else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second @@ -199,6 +211,7 @@ public class JobScheduleHelper { if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; + //随机休眠 没任务则休眠5秒内 有任务则休眠1秒内 TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { @@ -355,6 +368,14 @@ public class JobScheduleHelper { // ---------------------- tools ---------------------- + + /** + * 获取下次执行时间 + * @param jobInfo + * @param fromTime + * @return + * @throws Exception + */ public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null); if (ScheduleTypeEnum.CRON == scheduleTypeEnum) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index 398713dd..f896a25a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -97,6 +97,7 @@ public class JobTriggerPoolHelper { logger.error(e.getMessage(), e); } finally { + //统计1分钟内 调用超过500ms的 貌似没用到 // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 748befc6..0fbf75e1 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -49,6 +49,7 @@ public class XxlJobTrigger { String addressList) { // load data + //###############初始化 任务参数及执行器参数 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); @@ -76,6 +77,9 @@ public class XxlJobTrigger { shardingParam[1] = Integer.valueOf(shardingArr[1]); } } + //############################ + + //路由策略如果是 分片广播 则循环调用 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { @@ -111,7 +115,9 @@ public class XxlJobTrigger { private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param + //获取阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy + //获取路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; @@ -142,6 +148,7 @@ public class XxlJobTrigger { String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { + //分片广播 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); @@ -149,6 +156,7 @@ public class XxlJobTrigger { address = group.getRegistryList().get(0); } } else { + //基于不同的路由策略获取地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); @@ -161,6 +169,7 @@ public class XxlJobTrigger { // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { + //执行任务 调用 客户端 run 接口 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); @@ -193,6 +202,7 @@ public class XxlJobTrigger { //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); + //更新调用记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());