任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;

2.2.0
xuxueli 5 years ago
parent 2349098034
commit c4f51ba26a

@ -1285,11 +1285,16 @@ docker run --name xxl-job-admin -p 8080:8080 -d xuxueli/xxl-job-admin
原生提供通用命令行任务HandlerBean任务"CommandJobHandler");业务方只需要提供命令行即可;
如任务参数 "pwd" 将会执行命令并输出数据;
### 5.22 日志自动清理
### 5.22 日志自动清理
XXL-JOB日志主要包含如下两部分均支持日志自动清理说明如下
- 调度中心日志表数据:可借助配置项 "xxl.job.logretentiondays" 设置日志表数据保存天数,过期日志自动清理;详情可查看上文配置说明;
- 执行器日志文件数据:可借助配置项 "xxl.job.executor.logretentiondays" 设置日志文件数据保存天数,过期日志自动清理;详情可查看上文配置说明;
### 5.23 调度结果丢失处理
执行器因网络抖动回调失败或宕机等异常情况,会导致任务调度结果丢失。由于调度中心依赖执行器回调来感知调度结果,因此会导致调度日志永远处于 "运行中" 状态。
针对该问题,调度中心提供内置组件进行处理,逻辑为:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败
## 六、版本更新日志
### 6.1 版本 V1.1.x新特性[2015-12-05]
@ -1732,7 +1737,7 @@ data: post-data
- 17、任务复制功能点击复制是弹出新建任务弹框并初始化被复制任务信息
- 18、执行器UI交互优化,移除冗余order属性
- 19、任务手动执行一次的时候支持指定本次执行的机器地址为空则从执行器获取
- 20、[迭代中]任务结果丢失处理:针对长期处于运行中的任务(设置过期时间时,运行超过"过期时间+1min";未设置超时时间时,运行超过"30min"),主动检测该执行器是否在线,如果不在线主动标记失败;
- 20、任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败;
- 21、[迭代中]调度中心与执行器通讯规范为双向restful方便跨语言以及第三方执行器实现通讯组件xxl-rpc方案调整为Jetty+Gson方案

@ -31,9 +31,12 @@ public class XxlJobScheduler {
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run
JobLosedMonitorHelper.getInstance().start();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
@ -58,7 +61,10 @@ public class XxlJobScheduler {
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
// admin monitor stop
// admin lose-monitor stop
JobLosedMonitorHelper.getInstance().toStop();
// admin fail-monitor stop
JobFailMonitorHelper.getInstance().toStop();
// admin registry stop

@ -0,0 +1,95 @@
package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* job lose-monitor instance
*
* @author xuxueli 2015-9-1 18:05:56
*/
public class JobLosedMonitorHelper {
private static Logger logger = LoggerFactory.getLogger(JobLosedMonitorHelper.class);
private static JobLosedMonitorHelper instance = new JobLosedMonitorHelper();
public static JobLosedMonitorHelper getInstance(){
return instance;
}
// ---------------------- monitor ----------------------
private Thread monitorThread;
private volatile boolean toStop = false;
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
public void toStop(){
toStop = true;
// interrupt and wait
monitorThread.interrupt();
try {
monitorThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}

@ -57,4 +57,6 @@ public interface XxlJobLogDao {
@Param("oldAlarmStatus") int oldAlarmStatus,
@Param("newAlarmStatus") int newAlarmStatus);
public List<Long> findLostJobIds(@Param("losedTime") Date losedTime);
}

@ -23,7 +23,7 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?Unicode=true&characterEncoding=UTF-8
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

@ -178,6 +178,7 @@ joblog_handleCode_502=Timeout
joblog_kill_log=Kill Job
joblog_kill_log_limit=Trigger Fail, can not kill job
joblog_kill_log_byman=Manual operation, kill job
joblog_lost_fail=Job result lost, marked as failure
joblog_rolling_log=Rolling log
joblog_rolling_log_refresh=Refresh
joblog_rolling_log_triggerfail=The job trigger fail, can not view the rolling log

@ -178,6 +178,7 @@ joblog_handleCode_502=失败(超时)
joblog_kill_log=终止任务
joblog_kill_log_limit=调度失败,无法终止日志
joblog_kill_log_byman=人为操作,主动终止
joblog_lost_fail=任务结果丢失,标记失败
joblog_rolling_log=执行日志
joblog_rolling_log_refresh=刷新
joblog_rolling_log_triggerfail=任务发起调度失败,无法查看执行日志

@ -178,6 +178,7 @@ joblog_handleCode_502=失敗(超時)
joblog_kill_log=终止任務
joblog_kill_log_limit=調度失敗,無法终止日誌
joblog_kill_log_byman=人為操作,主動終止
joblog_lost_fail=任務結果丟失,標記失敗
joblog_rolling_log=執行日誌
joblog_rolling_log_refresh=更新
joblog_rolling_log_triggerfail=任務發起調度失敗,無法查看執行日誌

@ -245,5 +245,17 @@
`alarm_status` = #{newAlarmStatus}
WHERE `id`= #{logId} AND `alarm_status` = #{oldAlarmStatus}
</update>
<select id="findLostJobIds" resultType="long" >
SELECT t.id
FROM xxl_job_log AS t
WHERE t.trigger_code = 200
and t.handle_code = 0
and t.trigger_time <![CDATA[ <= ]]> #{losedTime}
and t.executor_address not in (
SELECT t2.registry_value
FROM xxl_job_registry AS t2
)
</select>
</mapper>

@ -135,6 +135,14 @@ public class DateUtil {
return add(date, Calendar.DAY_OF_MONTH, amount);
}
public static Date addHours(final Date date, final int amount) {
return add(date, Calendar.HOUR_OF_DAY, amount);
}
public static Date addMinutes(final Date date, final int amount) {
return add(date, Calendar.MINUTE, amount);
}
private static Date add(final Date date, final int calendarField, final int amount) {
if (date == null) {
return null;

Loading…
Cancel
Save