From 3138d48afa7a6857cb3a94345e309d0b518c9a5b Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Thu, 29 Jun 2017 20:58:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=BA=BF=E7=A8=8B=E8=BD=AE?= =?UTF-8?q?=E7=A9=BA30=E6=AC=A1=E5=90=8E=E8=87=AA=E5=8A=A8=E9=94=80?= =?UTF-8?q?=E6=AF=81=EF=BC=8C=E9=99=8D=E4=BD=8E=E4=BD=8E=E9=A2=91=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E6=97=A0=E6=95=88=E7=BA=BF=E7=A8=8B=E6=B6=88?= =?UTF-8?q?=E8=80=97=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 4 ++-- .../core/thread/JobRegistryMonitorHelper.java | 4 +++- .../xxl/job/core/executor/XxlJobExecutor.java | 2 +- .../com/xxl/job/core/thread/JobThread.java | 23 ++++++++++++------- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index c77e47c8..e5d9debc 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -864,6 +864,8 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 3、XxlJobLogger的日志多参数支持; - 4、路由策略新增 "忙碌转移" 模式:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; - 5、路由策略代码重构; +- 6、执行器重复注册问题修复; +- 7、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。 #### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; @@ -874,8 +876,6 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 6、任务依赖,流程图,子任务+会签任务,各节点日志; - 7、调度任务优先级; - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。 -- 9、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。 -- 10、注册界面,出现重复地址问题; ## 七、其他 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java index c609edb9..881df981 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java @@ -46,7 +46,9 @@ public class JobRegistryMonitorHelper { if (registryList == null) { registryList = new ArrayList(); } - registryList.add(item.getRegistryValue()); + if (!registryList.contains(item.getRegistryValue())) { + registryList.add(item.getRegistryValue()); + } temp.put(groupKey, registryList); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 222f59e7..cb5e993a 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -125,7 +125,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe // ---------------------------------- job thread repository private static ConcurrentHashMap JobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ - JobThread newJobThread = new JobThread(handler); + JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index 7cffa93f..106b7209 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -3,6 +3,7 @@ package com.xxl.job.core.thread; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; @@ -23,7 +24,8 @@ import java.util.concurrent.TimeUnit; */ public class JobThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(JobThread.class); - + + private int jobId; private IJobHandler handler; private LinkedBlockingQueue triggerQueue; private ConcurrentHashSet triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID @@ -32,12 +34,14 @@ public class JobThread extends Thread{ private String stopReason; private boolean running = false; // if running job + private int idleTimes = 0; // idel times - public JobThread(IJobHandler handler) { + public JobThread(int jobId, IJobHandler handler) { + this.jobId = jobId; this.handler = handler; - triggerQueue = new LinkedBlockingQueue(); - triggerLogIdSet = new ConcurrentHashSet(); + this.triggerQueue = new LinkedBlockingQueue(); + this.triggerLogIdSet = new ConcurrentHashSet(); } public IJobHandler getHandler() { return handler; @@ -88,11 +92,13 @@ public class JobThread extends Thread{ public void run() { while(!toStop){ running = false; + idleTimes++; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; + idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // parse param @@ -126,9 +132,6 @@ public class JobThread extends Thread{ XxlJobLogger.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); } - - - // callback handler info if (!toStop) { @@ -139,8 +142,12 @@ public class JobThread extends Thread{ ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); } + } else { + if (idleTimes > 3) { + XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); + } } - } catch (Exception e) { + } catch (Throwable e) { if (toStop) { XxlJobLogger.log("
----------- xxl-job toStop, stopReason:" + stopReason); }