From d30a2fcf39734b766b4413d8525caece8119784b Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Fri, 30 Oct 2020 22:06:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E7=94=9F?= =?UTF-8?q?=E5=91=BD=E5=91=A8=E6=9C=9F=E9=87=8D=E6=9E=84=EF=BC=9A=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=EF=BC=88schedule=EF=BC=89=E3=80=81=E8=A7=A6=E5=8F=91(?= =?UTF-8?q?trigger)=E3=80=81=E6=89=A7=E8=A1=8C=EF=BC=88handle=EF=BC=89?= =?UTF-8?q?=E3=80=81=E5=9B=9E=E8=B0=83(callback)=E3=80=81=E5=90=8E?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=88posthandle=EF=BC=89=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 7 +- .../admin/controller/JobLogController.java | 3 +- .../core/handle/XxlJobPostHandleHelper.java | 99 ++++++++++ .../core/scheduler/ScheduleTypeEnum.java | 2 +- .../admin/core/scheduler/XxlJobScheduler.java | 4 +- .../job/admin/core/thread/JobLogHelper.java | 185 ++++++++++++++++++ .../core/thread/JobLosedMonitorHelper.java | 104 ---------- .../admin/core/thread/JobScheduleHelper.java | 7 +- .../job/admin/service/impl/AdminBizImpl.java | 111 +---------- .../admin/service/impl/XxlJobServiceImpl.java | 7 +- .../main/resources/i18n/message_en.properties | 1 - .../resources/i18n/message_zh_CN.properties | 1 - .../resources/i18n/message_zh_TC.properties | 1 - .../xxl/job/admin/dao/XxlJobInfoDaoTest.java | 2 +- 14 files changed, 301 insertions(+), 233 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java delete mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLosedMonitorHelper.java diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 165d6029..9d4e4413 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -2086,7 +2086,7 @@ data: post-data - 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; - 21、调度过期策略:调度中心错误调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; - 22、触发策略强化:除了常规Cron、API、父子任务触发方式外,新增提供 "固定间隔触发、固定延时触发" 两种新触发方式; - +- 23、任务调度生命周期重构:调度(schedule)、触发(trigger)、执行(handle)、回调(callback)、后处理(posthandle); ### 7.32 版本 v2.3.0 Release Notes[规划中] - 1、[规划中]分片任务:全部完成后才会出发后置节点; @@ -2131,9 +2131,8 @@ data: post-data - 普通任务:只记录一条主任务; - 广播任务:记录一条主任务,每个分片任务记录一条次任务,关联在主任务上; - 重试任务:失败时,新增主任务。所有调度记录,包括入口调度和重试调度,均挂载主任务上。 -- 27、调度声明周期:调度、执行、回调、结果处理(公共逻辑:重试、fixdelay); -- 28、任务标签:方便搜索; -- 29、执行器:dag执行器,不需要注册机器; +- 27、任务标签:方便搜索; +- 28、执行器:dag执行器,不需要注册机器; ## 八、其他 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index 70e62ae8..b570ffea 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,6 +1,7 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.exception.XxlJobException; +import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; @@ -183,7 +184,7 @@ public class JobLogController { log.setHandleCode(ReturnT.FAIL_CODE); log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():"")); log.setHandleTime(new Date()); - xxlJobLogDao.updateHandleInfo(log); + XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); return new ReturnT(runResult.getMsg()); } else { return new ReturnT(500, runResult.getMsg()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java new file mode 100644 index 00000000..266df11e --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/handle/XxlJobPostHandleHelper.java @@ -0,0 +1,99 @@ +package com.xxl.job.admin.core.handle; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; + +/** + * @author xuxueli 2020-10-30 20:43:10 + */ +public class XxlJobPostHandleHelper { + private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); + + /** + * common fresh handle entrance (limit only once) + * + * @param xxlJobLog + * @return + */ + public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { + + // finish + finishJob(xxlJobLog); + + // text最大64kb 避免长度过长 + if (xxlJobLog.getHandleMsg().length() > 15000) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); + } + + // fresh handle + return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); + } + + + /** + * do somethind to finish job + */ + private static void finishJob(XxlJobLog xxlJobLog){ + + // 1、handle success, to trigger child job + String triggerChildMsg = null; + if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { + XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); + if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { + triggerChildMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; + + String[] childJobIds = xxlJobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), + (i+1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), + triggerChildResult.getMsg()); + } else { + triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), + (i+1), + childJobIds.length, + childJobIds[i]); + } + } + + } + } + + if (triggerChildMsg != null) { + xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); + } + + // 2、fix_delay trigger next + // on the way + + } + + private static boolean isNumeric(String str){ + try { + int result = Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/ScheduleTypeEnum.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/ScheduleTypeEnum.java index efe7b6f0..aa334fda 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/ScheduleTypeEnum.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/ScheduleTypeEnum.java @@ -22,7 +22,7 @@ public enum ScheduleTypeEnum { /** * schedule by fix delay (in seconds), after the last time */ - FIX_DELAY(I18nUtil.getString("schedule_type_fix_delay")); + /*FIX_DELAY(I18nUtil.getString("schedule_type_fix_delay"))*/; private String title; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index c05575ac..555105b6 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -34,7 +34,7 @@ public class XxlJobScheduler { JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) - JobLosedMonitorHelper.getInstance().start(); + JobLogHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); @@ -55,7 +55,7 @@ public class XxlJobScheduler { JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop - JobLosedMonitorHelper.getInstance().toStop(); + JobLogHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java new file mode 100644 index 00000000..cf5048fa --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLogHelper.java @@ -0,0 +1,185 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.util.I18nUtil; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.util.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +/** + * job lose-monitor instance + * + * @author xuxueli 2015-9-1 18:05:56 + */ +public class JobLogHelper { + private static Logger logger = LoggerFactory.getLogger(JobLogHelper.class); + + private static JobLogHelper instance = new JobLogHelper(); + public static JobLogHelper getInstance(){ + return instance; + } + + // ---------------------- monitor ---------------------- + + private ThreadPoolExecutor callbackThreadPool = null; + private Thread monitorThread; + private volatile boolean toStop = false; + public void start(){ + + // for callback + callbackThreadPool = new ThreadPoolExecutor( + 2, + 20, + 30L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(3000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + r.run(); + logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); + } + }); + + + // for monitor + monitorThread = new Thread(new Runnable() { + + @Override + public void run() { + + // wait for JobTriggerPoolHelper-init + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + // monitor + while (!toStop) { + try { + // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + Date losedTime = DateUtil.addMinutes(new Date(), -10); + List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); + + if (losedJobIds!=null && losedJobIds.size()>0) { + for (Long logId: losedJobIds) { + + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setId(logId); + + jobLog.setHandleTime(new Date()); + jobLog.setHandleCode(ReturnT.FAIL_CODE); + jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); + + XxlJobPostHandleHelper.updateHandleInfoAndFinish(jobLog); + } + + } + } catch (Exception e) { + if (!toStop) { + logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); + } + } + + try { + TimeUnit.SECONDS.sleep(60); + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + + } + + logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); + + } + }); + monitorThread.setDaemon(true); + monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); + monitorThread.start(); + } + + public void toStop(){ + toStop = true; + + // stop registryOrRemoveThreadPool + callbackThreadPool.shutdownNow(); + + // stop monitorThread (interrupt and wait) + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + + // ---------------------- helper ---------------------- + + public ReturnT callback(List callbackParamList) { + + callbackThreadPool.execute(new Runnable() { + @Override + public void run() { + for (HandleCallbackParam handleCallbackParam: callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); + } + } + }); + + return ReturnT.SUCCESS; + } + + private ReturnT callback(HandleCallbackParam handleCallbackParam) { + // valid log item + XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + if (log.getHandleCode() > 0) { + return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc + } + + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (log.getHandleMsg()!=null) { + handleMsg.append(log.getHandleMsg()).append("
"); + } + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); + } + + // success, save log + log.setHandleTime(new Date()); + log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); + log.setHandleMsg(handleMsg.toString()); + XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); + + return ReturnT.SUCCESS; + } + + + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLosedMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLosedMonitorHelper.java deleted file mode 100644 index 7658bc7b..00000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobLosedMonitorHelper.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.xxl.job.admin.core.thread; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.util.DateUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * job lose-monitor instance - * - * @author xuxueli 2015-9-1 18:05:56 - */ -public class JobLosedMonitorHelper { - private static Logger logger = LoggerFactory.getLogger(JobLosedMonitorHelper.class); - - private static JobLosedMonitorHelper instance = new JobLosedMonitorHelper(); - public static JobLosedMonitorHelper getInstance(){ - return instance; - } - - // ---------------------- monitor ---------------------- - - private Thread monitorThread; - private volatile boolean toStop = false; - public void start(){ - monitorThread = new Thread(new Runnable() { - - @Override - public void run() { - - // wait for JobTriggerPoolHelper-init - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - // monitor - while (!toStop) { - try { - // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; - Date losedTime = DateUtil.addMinutes(new Date(), -10); - List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); - - if (losedJobIds!=null && losedJobIds.size()>0) { - for (Long logId: losedJobIds) { - - XxlJobLog jobLog = new XxlJobLog(); - jobLog.setId(logId); - - jobLog.setHandleTime(new Date()); - jobLog.setHandleCode(ReturnT.FAIL_CODE); - jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); - - XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog); - } - - } - } catch (Exception e) { - if (!toStop) { - logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); - } - } - - try { - TimeUnit.SECONDS.sleep(60); - } catch (Exception e) { - if (!toStop) { - logger.error(e.getMessage(), e); - } - } - - } - - logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); - - } - }); - monitorThread.setDaemon(true); - monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); - monitorThread.start(); - } - - public void toStop(){ - toStop = true; - // interrupt and wait - monitorThread.interrupt(); - try { - monitorThread.join(); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index d3ca6fe0..d508b1f9 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -12,7 +12,6 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.text.ParseException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -279,7 +278,7 @@ public class JobScheduleHelper { ringThread.start(); } - private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException { + private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { Date nextValidTime = generateNextValidTime(jobInfo, fromTime); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); @@ -365,12 +364,12 @@ public class JobScheduleHelper { // ---------------------- tools ---------------------- - public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException { + public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null); if (ScheduleTypeEnum.CRON == scheduleTypeEnum) { Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime); return nextValidTime; - } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum || ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum) { + } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) { return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 ); } return null; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java index 670897c8..f728f56f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -1,27 +1,13 @@ package com.xxl.job.admin.service.impl; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobLogHelper; import com.xxl.job.admin.core.thread.JobRegistryHelper; -import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; -import com.xxl.job.admin.core.trigger.TriggerTypeEnum; -import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.admin.dao.XxlJobGroupDao; -import com.xxl.job.admin.dao.XxlJobInfoDao; -import com.xxl.job.admin.dao.XxlJobLogDao; -import com.xxl.job.admin.dao.XxlJobRegistryDao; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.handler.IJobHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; -import javax.annotation.Resource; -import java.text.MessageFormat; -import java.util.Date; import java.util.List; /** @@ -29,104 +15,11 @@ import java.util.List; */ @Service public class AdminBizImpl implements AdminBiz { - private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class); - - @Resource - public XxlJobLogDao xxlJobLogDao; - @Resource - private XxlJobInfoDao xxlJobInfoDao; - @Resource - private XxlJobRegistryDao xxlJobRegistryDao; - @Resource - private XxlJobGroupDao xxlJobGroupDao; @Override public ReturnT callback(List callbackParamList) { - for (HandleCallbackParam handleCallbackParam: callbackParamList) { - ReturnT callbackResult = callback(handleCallbackParam); - logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", - (callbackResult.getCode()==IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); - } - - return ReturnT.SUCCESS; - } - - private ReturnT callback(HandleCallbackParam handleCallbackParam) { - // valid log item - XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId()); - if (log == null) { - return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); - } - if (log.getHandleCode() > 0) { - return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc - } - - // trigger success, to trigger child job - String callbackMsg = null; - if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) { - XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId()); - if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { - callbackMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<<
"; - - String[] childJobIds = xxlJobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i+1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i+1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - // handle msg - StringBuffer handleMsg = new StringBuffer(); - if (log.getHandleMsg()!=null) { - handleMsg.append(log.getHandleMsg()).append("
"); - } - if (handleCallbackParam.getExecuteResult().getMsg() != null) { - handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); - } - if (callbackMsg != null) { - handleMsg.append(callbackMsg); - } - - if (handleMsg.length() > 15000) { - handleMsg = new StringBuffer(handleMsg.substring(0, 15000)); // text最大64kb 避免长度过长 - } - - // success, save log - log.setHandleTime(new Date()); - log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); - log.setHandleMsg(handleMsg.toString()); - xxlJobLogDao.updateHandleInfo(log); - - return ReturnT.SUCCESS; - } - - private boolean isNumeric(String str){ - try { - int result = Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } + return JobLogHelper.getInstance().callback(callbackParamList); } @Override diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 60354f64..567138df 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -21,7 +21,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.text.MessageFormat; -import java.text.ParseException; import java.util.*; /** @@ -82,7 +81,7 @@ public class XxlJobServiceImpl implements XxlJobService { if (jobInfo.getScheduleConf()==null || !CronExpression.isValidExpression(jobInfo.getScheduleConf())) { return new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid") ); } - } else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE || scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY) { + } else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE/* || scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY*/) { if (jobInfo.getScheduleConf() == null) { return new ReturnT(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) ); } @@ -186,7 +185,7 @@ public class XxlJobServiceImpl implements XxlJobService { if (jobInfo.getScheduleConf()==null || !CronExpression.isValidExpression(jobInfo.getScheduleConf())) { return new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid") ); } - } else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE || scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY) { + } else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE /*|| scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY*/) { if (jobInfo.getScheduleConf() == null) { return new ReturnT(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) ); } @@ -319,7 +318,7 @@ public class XxlJobServiceImpl implements XxlJobService { return new ReturnT(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) ); } nextTriggerTime = nextValidTime.getTime(); - } catch (ParseException e) { + } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) ); } diff --git a/xxl-job-admin/src/main/resources/i18n/message_en.properties b/xxl-job-admin/src/main/resources/i18n/message_en.properties index 9605916a..f51a3431 100644 --- a/xxl-job-admin/src/main/resources/i18n/message_en.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_en.properties @@ -118,7 +118,6 @@ jobinfo_field_timeout=Job timeout period jobinfo_field_gluetype=GLUE Type jobinfo_field_executorparam=Param jobinfo_field_cron_unvalid=The Cron is illegal -jobinfo_field_cron_never_fire=The Cron will never fire jobinfo_field_author=Author jobinfo_field_alarmemail=Alarm email jobinfo_field_alarmemail_placeholder=Please enter alarm mail, if there are more than one comma separated diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties index fc3f65b6..01deed40 100644 --- a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties @@ -117,7 +117,6 @@ jobinfo_field_jobdesc=任务描述 jobinfo_field_gluetype=运行模式 jobinfo_field_executorparam=任务参数 jobinfo_field_cron_unvalid=Cron格式非法 -jobinfo_field_cron_never_fire=Cron非法,永远不会触发 jobinfo_field_author=负责人 jobinfo_field_timeout=任务超时时间 jobinfo_field_alarmemail=报警邮件 diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties index 947c748f..20a79de7 100755 --- a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties @@ -117,7 +117,6 @@ jobinfo_field_jobdesc=任務描述 jobinfo_field_gluetype=運行模式 jobinfo_field_executorparam=任務參數 jobinfo_field_cron_unvalid=Cron 格式非法 -jobinfo_field_cron_never_fire=Cron 格式非法,永遠不會觸發 jobinfo_field_author=負責人 jobinfo_field_timeout=任務超時秒數 jobinfo_field_alarmemail=告警郵件 diff --git a/xxl-job-admin/src/test/java/com/xxl/job/admin/dao/XxlJobInfoDaoTest.java b/xxl-job-admin/src/test/java/com/xxl/job/admin/dao/XxlJobInfoDaoTest.java index 273ef46e..61fddd23 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/admin/dao/XxlJobInfoDaoTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/admin/dao/XxlJobInfoDaoTest.java @@ -56,7 +56,7 @@ public class XxlJobInfoDaoTest { int count = xxlJobInfoDao.save(info); XxlJobInfo info2 = xxlJobInfoDao.loadById(info.getId()); - info.setScheduleType(ScheduleTypeEnum.FIX_DELAY.name()); + info.setScheduleType(ScheduleTypeEnum.FIX_RATE.name()); info.setScheduleConf(String.valueOf(44)); info.setMisfireStrategy(MisfireStrategyEnum.FIRE_ONCE_NOW.name()); info2.setJobDesc("desc2");