From 79f9317ffe86567b127f9d80bcca2dc3d6006786 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Thu, 2 Mar 2017 17:52:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BA=95=E5=B1=82=E7=BA=BF=E7=A8=8B=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E7=BB=9F=E4=B8=80=EF=BC=9Bdestory=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/schedule/XxlJobDynamicScheduler.java | 4 +- .../admin/core/thread/JobMonitorHelper.java | 4 +- .../admin/core/thread/JobRegistryHelper.java | 2 +- .../xxl/job/core/executor/XxlJobExecutor.java | 27 +++++++++- .../rpc/netcom/jetty/server/JettyServer.java | 49 +++---------------- .../core/thread/ExecutorRegistryThread.java | 45 +++++++++++++++++ .../core/thread/TriggerCallbackThread.java | 29 ++++++++--- 7 files changed, 105 insertions(+), 55 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java index 782fab8c..03550981 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java @@ -78,10 +78,10 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In // destroy public void destroy(){ // admin registry stop - JobRegistryHelper.getInstance().stop(); + JobRegistryHelper.getInstance().toStop(); // admin monitor stop - JobMonitorHelper.getInstance().stop(); + JobMonitorHelper.getInstance().toStop(); serverFactory.destroy(); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java index b03858f2..6c2959b3 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java @@ -36,7 +36,7 @@ public class JobMonitorHelper { @Override public void run() { - while (true) { + while (!toStop) { try { logger.debug(">>>>>>>>>>> job monitor beat ... "); Integer jobLogId = JobMonitorHelper.instance.queue.take(); @@ -81,7 +81,7 @@ public class JobMonitorHelper { monitorThread.start(); } - public void stop(){ + public void toStop(){ toStop = true; //monitorThread.interrupt(); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index a0d4b500..46b667c8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -70,7 +70,7 @@ public class JobRegistryHelper { registryThread.start(); } - public void stop(){ + public void toStop(){ toStop = true; //registryThread.interrupt(); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index dc386e75..7b626665 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -6,7 +6,9 @@ import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.rpc.netcom.NetComServerFactory; +import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.JobThread; +import com.xxl.job.core.thread.TriggerCallbackThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -46,11 +48,33 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe // ---------------------------------- job server ------------------------------------ private NetComServerFactory serverFactory = new NetComServerFactory(); public void start() throws Exception { + // executor start NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); serverFactory.start(port, ip, appName, registHelper); + + // trigger callback thread start + TriggerCallbackThread.getInstance().start(); } public void destroy(){ + // executor stop serverFactory.destroy(); + + // job thread repository destory + if (JobThreadRepository.size() > 0) { + for (Map.Entry item: JobThreadRepository.entrySet()) { + JobThread jobThread = item.getValue(); + jobThread.toStop("Web容器销毁终止"); + jobThread.interrupt(); + + } + JobThreadRepository.clear(); + } + + // trigger callback thread stop + TriggerCallbackThread.getInstance().toStop(); + + // executor registry thread stop + ExecutorRegistryThread.getInstance().toStop(); } // ---------------------------------- init job handler ------------------------------------ @@ -99,7 +123,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe return jobThread; } public static JobThread loadJobThread(String jobKey){ - return JobThreadRepository.get(jobKey); + JobThread jobThread = JobThreadRepository.get(jobKey); + return jobThread; } public static void removeJobThread(String jobKey){ JobThreadRepository.remove(jobKey); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java index 25e1ddee..09a82336 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java @@ -1,7 +1,7 @@ package com.xxl.job.core.rpc.netcom.jetty.server; import com.xxl.job.core.registry.RegistHelper; -import com.xxl.job.core.util.IpUtil; +import com.xxl.job.core.thread.ExecutorRegistryThread; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -11,8 +11,6 @@ import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; - /** * rpc jetty server * @author xuxueli 2015-11-19 22:29:03 @@ -21,9 +19,9 @@ public class JettyServer { private static final Logger logger = LoggerFactory.getLogger(JettyServer.class); private Server server; - + private Thread thread; public void start(final int port, final String ip, final String appName, final RegistHelper registHelper) throws Exception { - Thread thread = new Thread(new Runnable() { + thread = new Thread(new Runnable() { @Override public void run() { server = new Server(); @@ -43,7 +41,7 @@ public class JettyServer { try { server.start(); logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port); - executorRegistryBeat(port, ip, appName, registHelper); + ExecutorRegistryThread.getInstance().start(port, ip, appName, registHelper); server.join(); // block until thread stopped logger.info(">>>>>>>>>>> xxl-rpc server start success, netcon={}, port={}", JettyServer.class.getName(), port); } catch (Exception e) { @@ -65,43 +63,10 @@ public class JettyServer { logger.error("", e); } } - logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName()); - } - - /** - * registry beat - * @param port - * @param ip - * @param appName - * @param registHelper - */ - private void executorRegistryBeat(final int port, final String ip, final String appName, final RegistHelper registHelper){ - if (registHelper==null && appName==null || appName.trim().length()==0) { - return; + if (thread.isAlive()) { + thread.interrupt(); } - Thread registryThread = new Thread(new Runnable() { - @Override - public void run() { - while (true) { - try { - // generate addredd = ip:port - String address = null; - if (ip != null && ip.trim().length()>0) { - address = ip.trim().concat(":").concat(String.valueOf(port)); - } else { - address = IpUtil.getIpPort(port); - } - - registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); - TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }); - registryThread.setDaemon(true); - registryThread.start(); + logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName()); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index be77d35c..485163ef 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -1,8 +1,53 @@ package com.xxl.job.core.thread; +import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.util.IpUtil; + +import java.util.concurrent.TimeUnit; + /** * Created by xuxueli on 17/3/2. */ public class ExecutorRegistryThread extends Thread { + private static ExecutorRegistryThread instance = new ExecutorRegistryThread(); + public static ExecutorRegistryThread getInstance(){ + return instance; + } + + private Thread registryThread; + private boolean toStop = false; + public void start(final int port, final String ip, final String appName, final RegistHelper registHelper){ + if (registHelper==null && appName==null || appName.trim().length()==0) { + return; + } + registryThread = new Thread(new Runnable() { + @Override + public void run() { + while (!toStop) { + try { + // generate addredd = ip:port + String address = null; + if (ip != null && ip.trim().length()>0) { + address = ip.trim().concat(":").concat(String.valueOf(port)); + } else { + address = IpUtil.getIpPort(port); + } + + registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); + TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }); + registryThread.setDaemon(true); + registryThread.start(); + } + + public void toStop() { + toStop = true; + } + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 2c3e29b7..3d30d2bf 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -15,14 +15,23 @@ import java.util.concurrent.LinkedBlockingQueue; public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); - private static LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); - static { - new Thread(new Runnable() { + private static TriggerCallbackThread instance = new TriggerCallbackThread(); + public static TriggerCallbackThread getInstance(){ + return instance; + } + + private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); + + private Thread triggerCallbackThread; + private boolean toStop = false; + public void start() { + triggerCallbackThread = new Thread(new Runnable() { + @Override public void run() { - while(true){ + while(!toStop){ try { - HandleCallbackParam callback = callBackQueue.take(); + HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { for (String address : callback.getLogAddress()) { try { @@ -44,10 +53,16 @@ public class TriggerCallbackThread { } } } - }).start(); + }); + triggerCallbackThread.setDaemon(true); + triggerCallbackThread.start(); + } + public void toStop(){ + toStop = true; } + public static void pushCallBack(HandleCallbackParam callback){ - callBackQueue.add(callback); + getInstance().callBackQueue.add(callback); logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); }