liyong 4 years ago
commit bee703c790

@ -28,6 +28,7 @@ public class XxlJobCompleter {
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
// finish
//执行字任务
finishJob(xxlJobLog);
// text最大64kb 避免长度过长
@ -36,6 +37,7 @@ public class XxlJobCompleter {
}
// fresh handle
//更新log表
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}

@ -12,6 +12,7 @@ import java.util.List;
/**
* Created by xuxueli on 17/3/10.
* idleBeat
*/
public class ExecutorRouteBusyover extends ExecutorRouter {

@ -11,6 +11,7 @@ import java.util.List;
/**
* Created by xuxueli on 17/3/10.
* beat SUCCESS
*/
public class ExecutorRouteFailover extends ExecutorRouter {

@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentMap;
* a(*)LFU(Least Frequently Used)使/
* bLRU(Least Recently Used)使
*
* 24 map+1
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLFU extends ExecutorRouter {

@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap;
* aLFU(Least Frequently Used)使/
* b(*)LRU(Least Recently Used)使
*
* LinkedHashMap accessorder=true访
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLRU extends ExecutorRouter {

@ -22,24 +22,31 @@ public class XxlJobScheduler {
public void init() throws Exception {
// init i18n
// 从配置文件中初始化 ExecutorBlockStrategyEnum title
initI18n();
// admin trigger pool start
// 初始化 fastTriggerPool用于 任务的重试(步骤四) 、slowTriggerPool用于 任务的重试超过10次的步骤四 线程池
JobTriggerPoolHelper.toStart();
// admin registry monitor run
//1.初始化registryOrRemoveThreadPool用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
//开启线程 扫描异常任务 重试机制
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
//初始化 callbackThreadPool用于接收客户端执行状态 线程池 启动 monitorThread 处理长期处于进行中的任务
JobCompleteHelper.getInstance().start();
// admin log report start
//开启 logrThread 统计报表相关 间隔一分钟
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
//任务的预处理 scheduleThread ringThread
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");

@ -43,30 +43,38 @@ public class JobFailMonitorHelper {
for (long failLogId: failLogIds) {
// lock log
//这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
//查询执行记录及对应的任务信息
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
//校验重试次数
if (log.getExecutorFailRetryCount() > 0) {
//执行任务
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
//更新记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info != null) {
//发送告警 不看
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
//更新告警状态
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}

@ -63,6 +63,7 @@ public class JobRegistryHelper {
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
//查询有没有90秒之前注册过的节点 有则直接删除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
@ -70,6 +71,7 @@ public class JobRegistryHelper {
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
//查询详细90秒内注册过的
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
@ -103,7 +105,7 @@ public class JobRegistryHelper {
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
//TODO 循环内更新
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
@ -113,6 +115,7 @@ public class JobRegistryHelper {
}
}
try {
//间隔30秒
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {

@ -70,6 +70,7 @@ public class JobScheduleHelper {
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//通过这个来控制多节点只有一个节点会执行 low的一笔
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
@ -77,18 +78,21 @@ public class JobScheduleHelper {
// 1、pre read
long nowTime = System.currentTimeMillis();
//下次执行时间 在5秒内的
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// 当前时间>下次执行时间+5秒 任务触发过期策略
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5spass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
//如果过期策略是立即执行一次则 执行
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
@ -96,19 +100,25 @@ public class JobScheduleHelper {
}
// 2、fresh next
//刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
}
//当前时间>下次执行时间 但是控制在5秒内
else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5sdirect-trigger && make next-trigger-time
// 1、trigger
//执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
//刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
//如果任务在执行中 且 下次执行时间在5秒内 则将 任务 缓存到 ringData
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
@ -122,7 +132,9 @@ public class JobScheduleHelper {
}
} else {
}
//还没到下次执行时间的 缓存到 ringData
else {
// 2.3、trigger-pre-readtime-ring trigger && make next-trigger-time
// 1、make ring second
@ -199,6 +211,7 @@ public class JobScheduleHelper {
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
//随机休眠 没任务则休眠5秒内 有任务则休眠1秒内
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
@ -355,6 +368,14 @@ public class JobScheduleHelper {
// ---------------------- tools ----------------------
/**
*
* @param jobInfo
* @param fromTime
* @return
* @throws Exception
*/
public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {

@ -97,6 +97,7 @@ public class JobTriggerPoolHelper {
logger.error(e.getMessage(), e);
} finally {
//统计1分钟内 调用超过500ms的 貌似没用到
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {

@ -49,6 +49,7 @@ public class XxlJobTrigger {
String addressList) {
// load data
//###############初始化 任务参数及执行器参数
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalidjobId={}", jobId);
@ -76,6 +77,9 @@ public class XxlJobTrigger {
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
//############################
//路由策略如果是 分片广播 则循环调用
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
@ -111,7 +115,9 @@ public class XxlJobTrigger {
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
//获取阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
//获取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
@ -142,6 +148,7 @@ public class XxlJobTrigger {
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
//分片广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
@ -149,6 +156,7 @@ public class XxlJobTrigger {
address = group.getRegistryList().get(0);
}
} else {
//基于不同的路由策略获取地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
@ -161,6 +169,7 @@ public class XxlJobTrigger {
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
//执行任务 调用 客户端 run 接口
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
@ -193,6 +202,7 @@ public class XxlJobTrigger {
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
//更新调用记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

Loading…
Cancel
Save