diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java
index 279ad7d1..cd7557d5 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java
@@ -28,6 +28,7 @@ public class XxlJobCompleter {
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
// finish
+ //执行字任务
finishJob(xxlJobLog);
// text最大64kb 避免长度过长
@@ -36,6 +37,7 @@ public class XxlJobCompleter {
}
// fresh handle
+ //更新log表
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java
index 868560fc..0b8bd355 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java
@@ -12,6 +12,7 @@ import java.util.List;
/**
* Created by xuxueli on 17/3/10.
+ * 通过调用 客户端 idleBeat 接口
*/
public class ExecutorRouteBusyover extends ExecutorRouter {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java
index a2e4c909..ea1e4ade 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java
@@ -11,6 +11,7 @@ import java.util.List;
/**
* Created by xuxueli on 17/3/10.
+ * 通过调用 客户端 beat接口 返回第一个SUCCESS的
*/
public class ExecutorRouteFailover extends ExecutorRouter {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java
index 9df19726..5151a44c 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java
@@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentMap;
* a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数
* b、LRU(Least Recently Used):最近最久未使用,时间
*
+ * 统计24小时内调用次数 最少的 每次调用通过map+1记录 如果是第一次则是随机生成调用次数 这个策略主要是当前节点调用次数 不是 被调方的 调用次数 如果是多节点不符合预期结果
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLFU extends ExecutorRouter {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java
index 2d540067..ee3d61bb 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java
@@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap;
* a、LFU(Least Frequently Used):最不经常使用,频率/次数
* b(*)、LRU(Least Recently Used):最近最久未使用,时间
*
+ * 基于 LinkedHashMap (accessorder=true)的访问顺序 来控制
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLRU extends ExecutorRouter {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java
index bb2cda8b..ba529f1b 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java
@@ -22,24 +22,31 @@ public class XxlJobScheduler {
public void init() throws Exception {
// init i18n
+ // 从配置文件中初始化 ExecutorBlockStrategyEnum title
initI18n();
// admin trigger pool start
+ // 初始化 fastTriggerPool(用于 任务的重试(步骤四) 、slowTriggerPool(用于 任务的重试超过10次的(步骤四)) 线程池
JobTriggerPoolHelper.toStart();
// admin registry monitor run
+ //1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
+ //开启线程 扫描异常任务 重试机制
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
+ //初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务
JobCompleteHelper.getInstance().start();
// admin log report start
+ //开启 logrThread 统计报表相关 间隔一分钟
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
+ //任务的预处理 scheduleThread ringThread
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java
index 8409d7b3..28d0a530 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java
@@ -43,30 +43,38 @@ public class JobFailMonitorHelper {
for (long failLogId: failLogIds) {
// lock log
+ //这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
+ //查询执行记录及对应的任务信息
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
+ //校验重试次数
if (log.getExecutorFailRetryCount() > 0) {
+ //执行任务
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "
>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<<
";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
+ //更新记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info != null) {
+ //发送告警 不看
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
+
+ //更新告警状态
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
index 37edfd98..02e412e4 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
@@ -63,6 +63,7 @@ public class JobRegistryHelper {
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
+ //查询有没有90秒之前注册过的节点 有则直接删除
List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
@@ -70,6 +71,7 @@ public class JobRegistryHelper {
// fresh online address (admin/executor)
HashMap> appAddressMap = new HashMap>();
+ //查询详细90秒内注册过的
List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
@@ -103,7 +105,7 @@ public class JobRegistryHelper {
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
-
+ //TODO 循环内更新
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
@@ -113,6 +115,7 @@ public class JobRegistryHelper {
}
}
try {
+ //间隔30秒
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java
index 831bcf6a..873240cc 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java
@@ -70,6 +70,7 @@ public class JobScheduleHelper {
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
+ //通过这个来控制多节点只有一个节点会执行 low的一笔
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
@@ -77,18 +78,21 @@ public class JobScheduleHelper {
// 1、pre read
long nowTime = System.currentTimeMillis();
+ //下次执行时间 在5秒内的
List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
+ // 当前时间>下次执行时间+5秒 任务触发过期策略
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
+ //如果过期策略是立即执行一次则 执行
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
@@ -96,19 +100,25 @@ public class JobScheduleHelper {
}
// 2、fresh next
+ //刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
- } else if (nowTime > jobInfo.getTriggerNextTime()) {
+ }
+ //当前时间>下次执行时间 但是控制在5秒内
+ else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
+ //执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
+ //刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
+ //如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
@@ -122,7 +132,9 @@ public class JobScheduleHelper {
}
- } else {
+ }
+ //还没到下次执行时间的 缓存到 ringData
+ else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
@@ -199,6 +211,7 @@ public class JobScheduleHelper {
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
+ //随机休眠 没任务则休眠5秒内 有任务则休眠1秒内
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java
index 398713dd..f896a25a 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java
@@ -97,6 +97,7 @@ public class JobTriggerPoolHelper {
logger.error(e.getMessage(), e);
} finally {
+ //统计1分钟内 调用超过500ms的 貌似没用到
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
index 748befc6..0fbf75e1 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
@@ -49,6 +49,7 @@ public class XxlJobTrigger {
String addressList) {
// load data
+ //###############初始化 任务参数及执行器参数
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
@@ -76,6 +77,9 @@ public class XxlJobTrigger {
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
+ //############################
+
+ //路由策略如果是 分片广播 则循环调用
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
@@ -111,7 +115,9 @@ public class XxlJobTrigger {
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
+ //获取阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
+ //获取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
@@ -142,6 +148,7 @@ public class XxlJobTrigger {
String address = null;
ReturnT routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
+ //分片广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
@@ -149,6 +156,7 @@ public class XxlJobTrigger {
address = group.getRegistryList().get(0);
}
} else {
+ //基于不同的路由策略获取地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
@@ -161,6 +169,7 @@ public class XxlJobTrigger {
// 4、trigger remote executor
ReturnT triggerResult = null;
if (address != null) {
+ //执行任务 调用 客户端 run 接口
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT(ReturnT.FAIL_CODE, null);
@@ -193,6 +202,7 @@ public class XxlJobTrigger {
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
+ //更新调用记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());