From 18162e75c2873ba55887078c9ad9fc1caad2c8a6 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Thu, 20 Jun 2019 19:24:24 +0800 Subject: [PATCH] update document --- .../admin/core/thread/JobScheduleHelper.java | 87 ++++++++++++------- .../admin/service/impl/XxlJobServiceImpl.java | 5 +- 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 01758ebd..dbcde37c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -25,6 +25,8 @@ public class JobScheduleHelper { return instance; } + public static final long PRE_READ_MS = 5000; // pre read + private Thread scheduleThread; private Thread ringThread; private volatile boolean scheduleThreadToStop = false; @@ -47,11 +49,11 @@ public class JobScheduleHelper { } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); + Connection conn = null; while (!scheduleThreadToStop) { // 扫描任务 long start = System.currentTimeMillis(); - Connection conn = null; PreparedStatement preparedStatement = null; try { if (conn==null || conn.isClosed()) { @@ -65,16 +67,16 @@ public class JobScheduleHelper { // tx start // 1、预读5s内调度任务 - long maxNextTime = System.currentTimeMillis() + 5000; - List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime); + List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(System.currentTimeMillis() + PRE_READ_MS); if (scheduleList!=null && scheduleList.size()>0) { // 2、推送时间轮 for (XxlJobInfo jobInfo: scheduleList) { // 时间轮刻度计算 - if (System.currentTimeMillis() > jobInfo.getTriggerNextTime() + 5000) { + if (System.currentTimeMillis() > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 过期超5s:本地忽略,当前时间开始计算下次触发时间 + // fresh next jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime( new CronExpression(jobInfo.getJobCron()) @@ -82,44 +84,55 @@ public class JobScheduleHelper { .getTime() ); - // pass - continue; - } else if (System.currentTimeMillis() > jobInfo.getTriggerNextTime()) { - // 过期5s内 :立即触发一次,当前时间开始计算下次触发时间 + // 过期5s内 :立即触发一次,当前时间开始计算下次触发时间;一旦过期,预读一次; - jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); - jobInfo.setTriggerNextTime( - new CronExpression(jobInfo.getJobCron()) - .getNextValidTimeAfter(new Date()) - .getTime() - ); + CronExpression cronExpression = new CronExpression(jobInfo.getJobCron()); + long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime(); - // do trigger + // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null); + // 2、fresh next + jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); + jobInfo.setTriggerNextTime(nextTime); + + // 3、check pre read + if (jobInfo.getTriggerNextTime() - System.currentTimeMillis() < PRE_READ_MS) { + + // 1、make ring second + int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); + + // 2、push time ring + pushTimeRing(ringSecond, jobInfo.getId()); + + // 3、fresh next + jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); + jobInfo.setTriggerNextTime( + new CronExpression(jobInfo.getJobCron()) + .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime())) + .getTime() + ); + + } + logger.debug(">>>>>>>>>>> xxl-job, push trigger : jobId = " + jobInfo.getId() ); } else { // 未过期:正常触发,递增计算下次触发时间 + // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); + // 2、push time ring + pushTimeRing(ringSecond, jobInfo.getId()); + + // 3、fresh next jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime( new CronExpression(jobInfo.getJobCron()) .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime())) .getTime() ); - - // push async ring - List ringItemData = ringData.get(ringSecond); - if (ringItemData == null) { - ringItemData = new ArrayList(); - ringData.put(ringSecond, ringItemData); - } - ringItemData.add(jobInfo.getId()); - - logger.debug(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); } } @@ -139,12 +152,6 @@ public class JobScheduleHelper { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - } - } if (null != preparedStatement) { try { preparedStatement.close(); @@ -166,6 +173,12 @@ public class JobScheduleHelper { } } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + } + } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); } }); @@ -235,6 +248,18 @@ public class JobScheduleHelper { ringThread.start(); } + private void pushTimeRing(int ringSecond, int jobId){ + // push async ring + List ringItemData = ringData.get(ringSecond); + if (ringItemData == null) { + ringItemData = new ArrayList(); + ringData.put(ringSecond, ringItemData); + } + ringItemData.add(jobId); + + logger.debug(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); + } + public void toStop(){ // 1、stop schedule diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 847d0740..8e067742 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -4,6 +4,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; +import com.xxl.job.admin.core.thread.JobScheduleHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; @@ -193,7 +194,7 @@ public class XxlJobServiceImpl implements XxlJobService { long nextTriggerTime = exists_jobInfo.getTriggerNextTime(); if (exists_jobInfo.getTriggerStatus() == 1 && !jobInfo.getJobCron().equals(exists_jobInfo.getJobCron()) ) { try { - nextTriggerTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + 5000)).getTime(); + nextTriggerTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS)).getTime(); } catch (ParseException e) { logger.error(e.getMessage(), e); return new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage()); @@ -239,7 +240,7 @@ public class XxlJobServiceImpl implements XxlJobService { // next trigger time (5s后生效,避开预读周期) long nextTriggerTime = 0; try { - nextTriggerTime = new CronExpression(xxlJobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + 5000)).getTime(); + nextTriggerTime = new CronExpression(xxlJobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS)).getTime(); } catch (ParseException e) { logger.error(e.getMessage(), e); return new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage());