From 5c1f59323e8cb3159f464ae7456d8da5d67cae11 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Wed, 22 May 2019 16:29:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E9=94=99?= =?UTF-8?q?=E8=BF=87=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4=E6=97=B6=E7=9A=84?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=AD=96=E7=95=A5=EF=BC=9A=20-=20=E5=8F=AF?= =?UTF-8?q?=E8=83=BD=E5=8E=9F=E5=9B=A0=EF=BC=9A=E6=9C=8D=E5=8A=A1=E9=87=8D?= =?UTF-8?q?=E5=90=AF=EF=BC=9B=E8=B0=83=E5=BA=A6=E7=BA=BF=E7=A8=8B=E8=A2=AB?= =?UTF-8?q?=E9=98=BB=E5=A1=9E=EF=BC=8C=E7=BA=BF=E7=A8=8B=E8=A2=AB=E8=80=97?= =?UTF-8?q?=E5=B0=BD=EF=BC=9B=E4=B8=8A=E6=AC=A1=E8=B0=83=E5=BA=A6=E6=8C=81?= =?UTF-8?q?=E7=BB=AD=E9=98=BB=E5=A1=9E=EF=BC=8C=E4=B8=8B=E6=AC=A1=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E8=A2=AB=E9=94=99=E8=BF=87=EF=BC=9B=20-=20=E5=A4=84?= =?UTF-8?q?=E7=90=86=E7=AD=96=E7=95=A5=EF=BC=9A=20=20=20=20=20-=20?= =?UTF-8?q?=E8=BF=87=E6=9C=9F=E8=B6=8510s=EF=BC=9A=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E5=BF=BD=E7=95=A5=EF=BC=8C=E5=BD=93=E5=89=8D=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E5=BC=80=E5=A7=8B=E8=AE=A1=E7=AE=97=E4=B8=8B=E6=AC=A1=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E6=97=B6=E9=97=B4=20=20=20=20=20-=20=E8=BF=87?= =?UTF-8?q?=E6=9C=9F=E8=B6=85=E8=BF=875s=EF=BC=9A=E8=BF=87=E6=9C=9F10s?= =?UTF-8?q?=E5=86=85=EF=BC=9A=E7=AB=8B=E5=8D=B3=E8=A7=A6=E5=8F=91=E4=B8=80?= =?UTF-8?q?=E6=AC=A1=EF=BC=8C=E5=BD=93=E5=89=8D=E6=97=B6=E9=97=B4=E5=BC=80?= =?UTF-8?q?=E5=A7=8B=E8=AE=A1=E7=AE=97=E4=B8=8B=E6=AC=A1=E8=A7=A6=E5=8F=91?= =?UTF-8?q?=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 6 +- .../admin/core/thread/JobScheduleHelper.java | 87 +++++++++++-------- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 35da6ac8..8901c742 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -836,11 +836,11 @@ XXL-JOB调度模块默认采用并行机制,在多线程调度的情况下, XXL-JOB的每个调度任务虽然在调度模块是并行调度执行的,但是任务调度传递到任务模块的“执行器”确实串行执行的,同时支持任务终止。 #### 5.4.6 过期处理策略 -任务调度错过了触发时间: +任务调度错过触发时间时的处理策略: - 可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过; - 处理策略: - - 过期5s内:立即触发一次,并计算下次触发时间; - - 过期超过5s:忽略过期触发,计算下次触发时间; + - 过期超10s:本地忽略,当前时间开始计算下次触发时间 + - 过期超过5s:过期10s内:立即触发一次,当前时间开始计算下次触发时间 #### 5.4.7 日志回调服务 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index cded7d5a..26f9452a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -1,8 +1,8 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.cron.CronExpression; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +58,7 @@ public class JobScheduleHelper { // tx start - // 1、查询JOB:"下次调度30s内" ( ...... -5 ... now ...... +30 ...... ) + // 1、查询JOB:"下次调度30s内" long maxNextTime = System.currentTimeMillis() + 30000; long nowTime = System.currentTimeMillis(); List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime); @@ -66,35 +66,53 @@ public class JobScheduleHelper { // 2、推送时间轮 for (XxlJobInfo jobInfo: scheduleList) { - // 过期策略:过期=更新下次触发时间为当前、过期是否5s内=立即出发,否则忽略; - if (jobInfo.getTriggerNextTime() < nowTime) { - jobInfo.setTriggerNextTime(nowTime); - if (jobInfo.getTriggerNextTime() < nowTime-10000) { - continue; - } + // 时间轮刻度计算 + int ringSecond = -1; + if (jobInfo.getTriggerNextTime() < nowTime - 10000) { // 过期超10s:本地忽略,当前时间开始计算下次触发时间 + ringSecond = -1; + + jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); + jobInfo.setTriggerNextTime( + new CronExpression(jobInfo.getJobCron()) + .getNextValidTimeAfter(new Date()) + .getTime() + ); + } else if (jobInfo.getTriggerNextTime() < nowTime) { // 过期10s内:立即触发一次,当前时间开始计算下次触发时间 + ringSecond = (int)((nowTime/1000)%60); + + jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); + jobInfo.setTriggerNextTime( + new CronExpression(jobInfo.getJobCron()) + .getNextValidTimeAfter(new Date()) + .getTime() + ); + } else { // 未过期:正常触发,递增计算下次触发时间 + ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); + + jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); + jobInfo.setTriggerNextTime( + new CronExpression(jobInfo.getJobCron()) + .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime())) + .getTime() + ); + } + if (ringSecond == -1) { + continue; } // push async ring - int second = (int)((jobInfo.getTriggerNextTime()/1000)%60); - List ringItemData = ringData.get(second); + List ringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList(); - ringData.put(second, ringItemData); + ringData.put(ringSecond, ringItemData); } ringItemData.add(jobInfo.getId()); - logger.info(">>>>>>>>>>> xxl-job, push time-ring : " + second + " = " + Arrays.asList(ringItemData) ); + logger.info(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); } // 3、更新trigger信息 for (XxlJobInfo jobInfo: scheduleList) { - // update - jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); - jobInfo.setTriggerNextTime( - new CronExpression(jobInfo.getJobCron()) - .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime())) - .getTime() - ); XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } @@ -141,28 +159,22 @@ public class JobScheduleHelper { List ringItemData = new ArrayList<>(); int nowSecond = (int)((System.currentTimeMillis()/1000)%60); // 避免处理耗时太长,跨过刻度; if (lastSecond == -1) { - if (ringData.containsKey(nowSecond)) { - List tmpData = ringData.remove(nowSecond); - if (tmpData != null) { - ringItemData.addAll(tmpData); - } + lastSecond = (nowSecond+59)%60; + } + for (int i = 1; i <=60; i++) { + int secondItem = (lastSecond+i)%60; + + List tmpData = ringData.remove(secondItem); + if (tmpData != null) { + ringItemData.addAll(tmpData); } - lastSecond = nowSecond; - } else { - for (int i = 1; i <=60; i++) { - int secondItem = (lastSecond+i)%60; - - List tmpData = ringData.remove(secondItem); - if (tmpData != null) { - ringItemData.addAll(tmpData); - } - if (secondItem == nowSecond) { - break; - } + if (secondItem == nowSecond) { + break; } - lastSecond = nowSecond; } + lastSecond = nowSecond; + logger.info(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData!=null && ringItemData.size()>0) { @@ -171,7 +183,6 @@ public class JobScheduleHelper { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null); } - // clear ringItemData.clear(); }