refactor(scheduler): 重构任务完成和触发逻辑

-重命名 XxlJobCompleter 为 JobCompleter 并改为 Spring 组件-重命名 XxlJobTrigger为 JobTrigger 并改为 Spring 组件
- 将静态方法调用改为通过 XxlJobAdminBootstrap 实例获取组件
- 更新 JobLogController 和 JobCompleteHelper 中的调用方式
- 移除不必要的导入和静态方法引用
- 使用 StringTool 替代自定义字符串处理方法
- 优化任务触发和完成的处理流程
pull/72/head
xuxueli 4 months ago
parent 23779284a6
commit 7011ef0e5c

@ -6,7 +6,6 @@ import com.xxl.job.admin.mapper.XxlJobLogMapper;
import com.xxl.job.admin.model.XxlJobGroup;
import com.xxl.job.admin.model.XxlJobInfo;
import com.xxl.job.admin.model.XxlJobLog;
import com.xxl.job.admin.scheduler.complete.XxlJobCompleter;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.scheduler.exception.XxlJobException;
import com.xxl.job.admin.util.I18nUtil;
@ -254,7 +253,7 @@ public class JobLogController {
log.setHandleCode(ReturnT.FAIL_CODE);
log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():""));
log.setHandleTime(new Date());
XxlJobCompleter.updateHandleInfoAndFinish(log);
XxlJobAdminBootstrap.getInstance().getJobCompleter().complete(log);
return ReturnT.ofSuccess(runResult.getMsg());
} else {
return ReturnT.ofFail(runResult.getMsg());

@ -16,6 +16,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* xxl-job alarmer
*
* @author xuxueli 17/7/13.
*/
@Component
public class JobAlarmer implements ApplicationContextAware, InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(JobAlarmer.class);

@ -1,6 +1,7 @@
package com.xxl.job.admin.scheduler.complete;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.mapper.XxlJobInfoMapper;
import com.xxl.job.admin.mapper.XxlJobLogMapper;
import com.xxl.job.admin.model.XxlJobInfo;
import com.xxl.job.admin.model.XxlJobLog;
import com.xxl.job.admin.scheduler.thread.JobTriggerPoolHelper;
@ -8,53 +9,71 @@ import com.xxl.job.admin.scheduler.trigger.TriggerTypeEnum;
import com.xxl.job.admin.util.I18nUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.tool.core.StringTool;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
/**
* xxl-job job log complete
*
* @author xuxueli 2020-10-30 20:43:10
*/
public class XxlJobCompleter {
private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class);
@Component
public class JobCompleter {
private static final Logger logger = LoggerFactory.getLogger(JobCompleter.class);
@Resource
private XxlJobInfoMapper xxlJobInfoMapper;
@Resource
private XxlJobLogMapper xxlJobLogMapper;
/**
* common fresh handle entrance (limit only once)
*
* @param xxlJobLog
* @return
* complate job (limit only once)
*/
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
public int complete(XxlJobLog xxlJobLog) {
// finish
finishJob(xxlJobLog);
// 1、process child-job
processChildJob(xxlJobLog);
// text最大64kb 避免长度过长
if (xxlJobLog.getHandleMsg().length() > 15000) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
}
// fresh handle
return XxlJobAdminBootstrap.getInstance().getXxlJobLogMapper().updateHandleInfo(xxlJobLog);
// 2、fix_delay trigger next
// on the way
// 3、update job handle-info
return xxlJobLogMapper.updateHandleInfo(xxlJobLog);
}
/**
* do somethind to finish job
*/
private static void finishJob(XxlJobLog xxlJobLog){
private void processChildJob(XxlJobLog xxlJobLog){
// 1、handle success, to trigger child job
String triggerChildMsg = null;
if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminBootstrap.getInstance().getXxlJobInfoMapper().loadById(xxlJobLog.getJobId());
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
XxlJobInfo xxlJobInfo = xxlJobInfoMapper.loadById(xxlJobLog.getJobId());
// process child job
if (xxlJobInfo!=null && StringTool.isNotBlank(xxlJobInfo.getChildJobId())) {
triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
// process eath child
int childJobId = (StringTool.isNotBlank(childJobIds[i]) && StringTool.isNumeric(childJobIds[i]))
?Integer.parseInt(childJobIds[i])
:-1;
if (childJobId > 0) {
// valid
if (childJobId == xxlJobLog.getJobId()) {
@ -84,22 +103,20 @@ public class XxlJobCompleter {
}
}
if (triggerChildMsg != null) {
// 2、append trigger-child message
if (StringTool.isNotBlank(triggerChildMsg)) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg );
}
// 2、fix_delay trigger next
// on the way
}
private static boolean isNumeric(String str){
/*private static boolean isNumeric(String str){
try {
int result = Integer.valueOf(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
}*/
}

@ -2,7 +2,9 @@ package com.xxl.job.admin.scheduler.config;
import com.xxl.job.admin.scheduler.alarm.JobAlarmer;
import com.xxl.job.admin.mapper.*;
import com.xxl.job.admin.scheduler.complete.JobCompleter;
import com.xxl.job.admin.scheduler.thread.*;
import com.xxl.job.admin.scheduler.trigger.JobTrigger;
import com.xxl.job.admin.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.client.ExecutorBizClient;
@ -184,6 +186,10 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean {
private DataSource dataSource;
@Resource
private JobAlarmer jobAlarmer;
@Resource
private JobTrigger jobTrigger;
@Resource
private JobCompleter jobCompleter;
public String getI18n() {
@ -258,4 +264,12 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean {
return jobAlarmer;
}
public JobTrigger getJobTrigger() {
return jobTrigger;
}
public JobCompleter getJobCompleter() {
return jobCompleter;
}
}

@ -1,6 +1,5 @@
package com.xxl.job.admin.scheduler.thread;
import com.xxl.job.admin.scheduler.complete.XxlJobCompleter;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.model.XxlJobLog;
import com.xxl.job.admin.util.I18nUtil;
@ -88,7 +87,7 @@ public class JobCompleteHelper {
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
XxlJobAdminBootstrap.getInstance().getJobCompleter().complete(jobLog);
}
}
@ -174,7 +173,7 @@ public class JobCompleteHelper {
log.setHandleTime(new Date());
log.setHandleCode(handleCallbackParam.getHandleCode());
log.setHandleMsg(handleMsg.toString());
XxlJobCompleter.updateHandleInfoAndFinish(log);
XxlJobAdminBootstrap.getInstance().getJobCompleter().complete(log);
return ReturnT.ofSuccess();
}

@ -2,7 +2,6 @@ package com.xxl.job.admin.scheduler.thread;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.scheduler.trigger.TriggerTypeEnum;
import com.xxl.job.admin.scheduler.trigger.XxlJobTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -104,7 +103,7 @@ public class JobTriggerPoolHelper {
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
XxlJobAdminBootstrap.getInstance().getJobTrigger().trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
} finally {

@ -1,5 +1,8 @@
package com.xxl.job.admin.scheduler.trigger;
import com.xxl.job.admin.mapper.XxlJobGroupMapper;
import com.xxl.job.admin.mapper.XxlJobInfoMapper;
import com.xxl.job.admin.mapper.XxlJobLogMapper;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.model.XxlJobGroup;
import com.xxl.job.admin.model.XxlJobInfo;
@ -13,17 +16,30 @@ import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.tool.core.StringTool;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* xxl-job trigger
* Created by xuxueli on 17/7/13.
*
* @author xuxueli 17/7/13.
*/
public class XxlJobTrigger {
private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);
@Component
public class JobTrigger {
private static final Logger logger = LoggerFactory.getLogger(JobTrigger.class);
@Resource
private XxlJobInfoMapper xxlJobInfoMapper;
@Resource
private XxlJobGroupMapper xxlJobGroupMapper;
@Resource
private XxlJobLogMapper xxlJobLogMapper;
/**
* trigger job
@ -41,7 +57,7 @@ public class XxlJobTrigger {
* null: use executor addressList
* not null: cover
*/
public static void trigger(int jobId,
public void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
@ -49,7 +65,7 @@ public class XxlJobTrigger {
String addressList) {
// load data
XxlJobInfo jobInfo = XxlJobAdminBootstrap.getInstance().getXxlJobInfoMapper().loadById(jobId);
XxlJobInfo jobInfo = xxlJobInfoMapper.loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalidjobId={}", jobId);
return;
@ -58,7 +74,7 @@ public class XxlJobTrigger {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminBootstrap.getInstance().getXxlJobGroupMapper().load(jobInfo.getJobGroup());
XxlJobGroup group = xxlJobGroupMapper.load(jobInfo.getJobGroup());
// cover addressList
if (StringTool.isNotBlank(addressList)) {
@ -72,8 +88,8 @@ public class XxlJobTrigger {
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && StringTool.isNumeric(shardingArr[0]) && StringTool.isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
shardingParam[0] = Integer.parseInt(shardingArr[0]);
shardingParam[1] = Integer.parseInt(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
@ -91,24 +107,31 @@ public class XxlJobTrigger {
}
private static boolean isNumeric(String str){
/*private static boolean isNumeric(String str){
try {
int result = Integer.valueOf(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
}*/
/**
* process trigger with log
*
* @param group job group, registry list may be empty
* @param jobInfo
* @param finalFailRetryCount
* @param triggerType
* @param jobInfo job info
* @param finalFailRetryCount the fail-retry count
* @param triggerType trigger type
* @param index sharding index
* @param total sharding index
*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
private void processTrigger(XxlJobGroup group,
XxlJobInfo jobInfo,
int finalFailRetryCount,
TriggerTypeEnum triggerType,
int index,
int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
@ -120,7 +143,7 @@ public class XxlJobTrigger {
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminBootstrap.getInstance().getXxlJobLogMapper().save(jobLog);
xxlJobLogMapper.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
@ -161,13 +184,13 @@ public class XxlJobTrigger {
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
triggerResult = doTrigger(triggerParam, address);
} else {
triggerResult = ReturnT.ofFail(null);
}
// 5、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
StringBuilder triggerMsgSb = new StringBuilder();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append("").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append("").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append("")
@ -193,18 +216,19 @@ public class XxlJobTrigger {
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminBootstrap.getInstance().getXxlJobLogMapper().updateTriggerInfo(jobLog);
xxlJobLogMapper.updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
/**
* run executor
* @param triggerParam
* @param address
* @return
* do trigger with address
*
* @param triggerParam trigger param
* @param address the address
* @return return
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
private ReturnT<String> doTrigger(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobAdminBootstrap.getExecutorBiz(address);
Loading…
Cancel
Save