diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 54eed9eb..ea404743 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1477,6 +1477,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 11、升级xxl-rpc至较新版本,修复代理服务初始化时远程服务不可用导致长连冗余创建的问题; - 12、首页调度报表的日期排序在TIDB下乱序问题修复; - 13、调度中心与执行器双向通讯超时时间调整为3s; +- 14、调度组件销毁流程优化,先停止调度线程,然后等待时间轮内存量任务处理完成,最终销毁时间轮线程; ### 6.26 版本 v2.1.1 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index e759c931..b9f54446 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -27,7 +27,8 @@ public class JobScheduleHelper { private Thread scheduleThread; private Thread ringThread; - private volatile boolean toStop = false; + private volatile boolean scheduleThreadToStop = false; + private volatile boolean ringThreadToStop = false; private volatile static Map> ringData = new ConcurrentHashMap<>(); public void start(){ @@ -40,13 +41,13 @@ public class JobScheduleHelper { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { - if (!toStop) { + if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); - while (!toStop) { + while (!scheduleThreadToStop) { // 扫描任务 long start = System.currentTimeMillis(); @@ -127,7 +128,7 @@ public class JobScheduleHelper { conn.commit(); } catch (Exception e) { - if (!toStop) { + if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { @@ -152,7 +153,7 @@ public class JobScheduleHelper { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } } catch (InterruptedException e) { - if (!toStop) { + if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } @@ -175,13 +176,13 @@ public class JobScheduleHelper { try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { - if (!toStop) { + if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } int lastSecond = -1; - while (!toStop) { + while (!ringThreadToStop) { try { // second data @@ -216,7 +217,7 @@ public class JobScheduleHelper { ringItemData.clear(); } } catch (Exception e) { - if (!toStop) { + if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } @@ -225,7 +226,7 @@ public class JobScheduleHelper { try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { - if (!toStop) { + if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } @@ -239,23 +240,61 @@ public class JobScheduleHelper { } public void toStop(){ - toStop = true; - // interrupt and wait - scheduleThread.interrupt(); + // 1、stop schedule + scheduleThreadToStop = true; try { - scheduleThread.join(); + TimeUnit.SECONDS.sleep(1); // wait } catch (InterruptedException e) { logger.error(e.getMessage(), e); } + if (scheduleThread.getState() != Thread.State.TERMINATED){ + // interrupt and wait + scheduleThread.interrupt(); + try { + scheduleThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } - // interrupt and wait - ringThread.interrupt(); + // if has ring data + boolean hasRingData = false; + if (!ringData.isEmpty()) { + for (int second : ringData.keySet()) { + List tmpData = ringData.get(second); + if (tmpData!=null && tmpData.size()>0) { + hasRingData = true; + break; + } + } + } + if (hasRingData) { + try { + TimeUnit.SECONDS.sleep(8); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + // stop ring (wait job-in-memory stop) + ringThreadToStop = true; try { - ringThread.join(); + TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } + if (ringThread.getState() != Thread.State.TERMINATED){ + // interrupt and wait + ringThread.interrupt(); + try { + ringThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop"); } }