diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index de28ce54..8799390a 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -987,9 +987,11 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 2、规范项目目录,方便扩展多执行器; - 3、新增JFinal类型执行器sample示例项目; - 4、执行器手动设置IP时将会绑定Host; -- 5、项目主页搭建,提供中英文文档; -- 6、执行器回调线程优化,线程销毁前批量回调队列中数据,防止任务结果丢失; -- 7、执行器注册线程优化,线程销毁时主动摘除注册机器信息,提高执行器注册的实时性; +- 5、项目主页搭建,提供中英文文档(http://www.xuxueli.com/xxl-job); +- 6、执行器回调线程销毁前, 批量回调队列中数据,防止任务结果丢失; +- 7、执行器注册线程销毁时, 主动摘除注册机器信息,提高执行器注册的实时性; +- 8、调度中心任务监控线程销毁时,批量对失败任务告警,防止告警信息丢失; +- 9、调度中心API服务:支持API方式触发任务执行; ### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; @@ -1002,8 +1004,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 8、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用; - 9、国际化:调度中心界面。 - 10、执行器摘除:执行器销毁时,主动通知调度中心并摘除对应执行器节点,提高执行器状态感知的时效性。 -- 11、调度中心API服务:支持API方式触发任务执行; -- 12、任务参数类型改为string,进一步方便参数传递; +- 11、任务类方法"IJobHandler.execute"的参数类型改为"string",进一步方便参数传递;任务注解和任务类统一并改为"JobHandler""; ## 七、其他 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 3b4d95fd..7d7838de 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -10,10 +10,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.MessageFormat; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.*; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * job monitor instance @@ -36,38 +35,30 @@ public class JobFailMonitorHelper { @Override public void run() { + + // monitor while (!toStop) { try { - logger.debug(">>>>>>>>>>> job monitor beat ... "); Integer jobLogId = JobFailMonitorHelper.instance.queue.take(); if (jobLogId != null && jobLogId > 0) { - logger.debug(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId); XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId); if (log!=null) { if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) { - // running - try { - TimeUnit.SECONDS.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // job running, wait + again monitor + TimeUnit.SECONDS.sleep(10); + JobFailMonitorHelper.monitor(jobLogId); + logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId); } if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && ReturnT.SUCCESS_CODE==log.getHandleCode()) { - // pass + // job success, pass + logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId); } - if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) { - XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId()); - if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { - Set emailSet = new HashSet(Arrays.asList(info.getAlarmEmail().split(","))); - for (String email: emailSet) { - String title = "《调度监控报警》(任务调度中心XXL-JOB)"; - XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(Integer.valueOf(info.getJobGroup())); - String content = MessageFormat.format("任务调度失败, 执行器名称:{0}, 任务描述:{1}.", group!=null?group.getTitle():"null", info.getJobDesc()); - MailUtil.sendMail(email, title, content, false, null); - } - } + if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) { + // job fail, + sendMonitorEmail(log); + logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId); } } } @@ -75,15 +66,54 @@ public class JobFailMonitorHelper { logger.error("job monitor error:{}", e); } } + + // monitor all clear + List jobLogIdList = new ArrayList(); + int drainToNum = getInstance().queue.drainTo(jobLogIdList); + if (jobLogIdList!=null && jobLogIdList.size()>0) { + for (Integer jobLogId: jobLogIdList) { + XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId); + if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) { + // job fail, + sendMonitorEmail(log); + logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId); + } + } + } + } }); monitorThread.setDaemon(true); monitorThread.start(); } + /** + * send monitor email + * @param jobLog + */ + private void sendMonitorEmail(XxlJobLog jobLog){ + XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobLog.getJobId()); + if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { + + Set emailSet = new HashSet(Arrays.asList(info.getAlarmEmail().split(","))); + for (String email: emailSet) { + String title = "《调度监控报警》(任务调度中心XXL-JOB)"; + XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(Integer.valueOf(info.getJobGroup())); + String content = MessageFormat.format("任务调度失败, 执行器名称:{0}, 任务描述:{1}.", group!=null?group.getTitle():"null", info.getJobDesc()); + MailUtil.sendMail(email, title, content, false, null); + } + } + } + public void toStop(){ toStop = true; - //monitorThread.interrupt(); + // interrupt and wait + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } } // producer diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java index 00613738..94f40a0e 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java @@ -91,7 +91,13 @@ public class JobRegistryMonitorHelper { public void toStop(){ toStop = true; - //registryThread.interrupt(); + // interrupt and wait + registryThread.interrupt(); + try { + registryThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java index 77176e42..d8770adf 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -4,6 +4,7 @@ import com.xxl.job.admin.controller.JobApiController; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; +import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobLogDao; import com.xxl.job.admin.dao.XxlJobRegistryDao; @@ -124,4 +125,11 @@ public class AdminBizImpl implements AdminBiz { return ReturnT.SUCCESS; } + @Override + public ReturnT triggerJob(int jobId) { + // TODO (thread queue trigger) + + return ReturnT.SUCCESS; + } + } diff --git a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminBizTest.java b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminBizTest.java index dba585cf..b65cfb53 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminBizTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminBizTest.java @@ -14,26 +14,37 @@ import org.junit.Test; */ public class AdminBizTest { + // admin-client + private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING); + private static String accessToken = null; + @Test public void registryTest() throws Exception { - - // admin-client - String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING); - String accessToken = null; AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject(); // test executor registry RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999"); ReturnT returnT = adminBiz.registry(registryParam); Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); + } - + @Test + public void registryRemove() throws Exception { + AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject(); // test executor registry remove - registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999"); - returnT = adminBiz.registryRemove(registryParam); + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999"); + ReturnT returnT = adminBiz.registryRemove(registryParam); Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); + } + + @Test + public void triggerJob() throws Exception { + AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject(); + int jobId = 1; + ReturnT returnT = adminBiz.triggerJob(1); + Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java index 8c403519..5bbb62f2 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java @@ -37,4 +37,13 @@ public interface AdminBiz { */ public ReturnT registryRemove(RegistryParam registryParam); + + /** + * trigger job for once + * + * @param jobId + * @return + */ + public ReturnT triggerJob(int jobId); + }