pull/56/head
liyong 4 years ago
parent 5aa7b240c9
commit 85270a7b3d

@ -22,31 +22,33 @@ public class XxlJobScheduler {
public void init() throws Exception {
// init i18n
// 从配置文件中初始化 ExecutorBlockStrategyEnum title
// TODO 国际化阻塞策略
initI18n();
// admin trigger pool start
// 初始化 fastTriggerPool用于 任务的重试(步骤四) 、slowTriggerPool用于 任务的重试超过10次的步骤四 线程池
// TODO 初始化 fastTriggerPool用于 执行任务步骤四、步骤7 、slowTriggerPool用于 执行慢任务步骤四、步骤7 线程池
//TODO 注意最大线程数 后面会用于 计算一次循环从数据库拉取的任务数
JobTriggerPoolHelper.toStart();
// admin registry monitor run
//1.初始化registryOrRemoveThreadPool用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次
//TODO 1.初始化registryOrRemoveThreadPool用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
//开启线程 扫描异常任务 重试机制
//TODO 步骤4 开启线程 扫描异常任务 重试机制
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
//初始化 callbackThreadPool用于接收客户端执行状态 线程池 启动 monitorThread 处理长期处于进行中的任务
//TODO 初始化 callbackThreadPool用于接收客户端执行状态 线程池 启动 monitorThread 处理长期处于进行中的任务
JobCompleteHelper.getInstance().start();
// admin log report start
//开启 logrThread 统计报表相关 间隔一分钟
//todo 开启 logrThread 统计报表相关 间隔一分钟
//TODO 每次统计都是统计一天的 数据量过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
//任务的预处理 scheduleThread ringThread
//TODO 步骤7 任务的预处理 scheduleThread ringThread
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");

@ -74,7 +74,7 @@ public class JobCompleteHelper {
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败
//TODO 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

@ -53,7 +53,7 @@ public class JobFailMonitorHelper {
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
//校验重试次数
//TODO 校验重试次数
if (log.getExecutorFailRetryCount() > 0) {
//执行任务
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);

@ -59,11 +59,12 @@ public class JobRegistryHelper {
while (!toStop) {
try {
// auto registry group
//
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
//查询有没有90秒之前注册过的节点 有则直接删除
//TODO 查询有没有90秒之前注册过的节点 有则直接删除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
@ -71,10 +72,11 @@ public class JobRegistryHelper {
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
//查询详细90秒内注册过的
//TODO 查询 90秒内注册过的
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
//TODO
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
@ -105,7 +107,7 @@ public class JobRegistryHelper {
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
//TODO 循环更新
//TODO 循环更新 执行器 对应的 注册列表
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
@ -115,7 +117,7 @@ public class JobRegistryHelper {
}
}
try {
//间隔30秒
//TODO 间隔30秒
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {

@ -70,7 +70,7 @@ public class JobScheduleHelper {
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//通过这个来控制多节点只有一个节点会执行 low的一笔
//TODO 通过这个来控制多节点只有一个节点会执行 low的一笔
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
@ -78,14 +78,14 @@ public class JobScheduleHelper {
// 1、pre read
long nowTime = System.currentTimeMillis();
//下次执行时间 在5秒内的
//TODO 下次执行时间 在5秒内的 整个框架的核心
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// 当前时间>下次执行时间+5秒 任务触发过期策略
// TODO 当前时间>下次执行时间+5秒 任务触发过期策略 5秒内不算过期
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5spass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
@ -104,27 +104,29 @@ public class JobScheduleHelper {
refreshNextValidTime(jobInfo, new Date());
}
//当前时间>下次执行时间 但是控制在5秒内
//TODO 当前时间>下次执行时间 过期了但是控制在5秒内不算过期
else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5sdirect-trigger && make next-trigger-time
// 1、trigger
//执行
//TODO 执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
//刷新下次执行时间
//TODO 刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
//如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData
//TODO 如果任务在运行状态 且 下次执行时间在5秒内 则将 任务 缓存到 ringData
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
//TODO 获取下次执行时间 是哪一秒 作为缓存的key
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
//TODO 缓存至 ringData
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
@ -249,7 +251,8 @@ public class JobScheduleHelper {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
//TODO 避免处理耗时太长,跨过刻度,向前校验一个刻度;
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
@ -262,7 +265,7 @@ public class JobScheduleHelper {
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
//TODO do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear

@ -97,7 +97,7 @@ public class JobTriggerPoolHelper {
logger.error(e.getMessage(), e);
} finally {
//统计1分钟内 调用超过500ms的 貌似没用到
//TODO 统计1分钟内 调用超过500ms的
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {

@ -79,7 +79,7 @@ public class XxlJobTrigger {
}
//############################
//路由策略如果是 分片广播 则循环调用
//TODO 路由策略如果是 分片广播 则循环调用
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
@ -90,6 +90,7 @@ public class XxlJobTrigger {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
//TODO
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
@ -115,9 +116,9 @@ public class XxlJobTrigger {
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
//获取阻塞策略
//TODO 获取阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
//获取路由策略
//TODO 获取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
@ -156,7 +157,7 @@ public class XxlJobTrigger {
address = group.getRegistryList().get(0);
}
} else {
//基于不同的路由策略获取地址
//TODO 基于不同的路由策略获取地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
@ -169,7 +170,7 @@ public class XxlJobTrigger {
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
//执行任务 调用 客户端 run 接口
//TODO 执行任务 调用 客户端 run 接口
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
@ -202,7 +203,7 @@ public class XxlJobTrigger {
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
//更新调用记录
//TODO 更新调用记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

Loading…
Cancel
Save