From 09d8952448372bd613a326914167fa8a5dfa8101 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Sun, 30 Jul 2017 13:34:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E9=80=BB=E8=BE=91=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xxl/job/core/executor/XxlJobExecutor.java | 124 +++++++++--------- .../com/xxl/job/core/glue/GlueFactory.java | 10 +- .../core/handler/impl/ScriptJobHandler.java | 7 +- .../xxl/job/core/log/XxlJobFileAppender.java | 10 +- .../rpc/netcom/jetty/server/JettyServer.java | 18 ++- .../com/xxl/job/core/util/ScriptUtil.java | 4 +- 6 files changed, 95 insertions(+), 78 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index ca5e1bf1..dde5f14e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -5,19 +5,15 @@ import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; +import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.rpc.netcom.NetComClientProxy; import com.xxl.job.core.rpc.netcom.NetComServerFactory; -import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.JobThread; -import com.xxl.job.core.thread.TriggerCallbackThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextClosedEvent; import java.util.ArrayList; import java.util.List; @@ -27,15 +23,16 @@ import java.util.concurrent.ConcurrentHashMap; /** * Created by xuxueli on 2016/3/2 21:14. */ -public class XxlJobExecutor implements ApplicationContextAware, ApplicationListener { +public class XxlJobExecutor implements ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); + // ---------------------------------- param ------------------------------------ private String ip; private int port = 9999; private String appName; private String adminAddresses; private String accessToken; - public static String logPath = "/data/applogs/xxl-job/jobhandler/"; + private String logPath; public void setIp(String ip) { this.ip = ip; @@ -56,6 +53,48 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe this.logPath = logPath; } + + // ---------------------------------- applicationContext ------------------------------------ + private static ApplicationContext applicationContext; + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + + // ---------------------------------- start + stop ------------------------------------ + public void start() throws Exception { + // init admin-client + initAdminBizList(adminAddresses, accessToken); + + // init executor-jobHandlerRepository + initJobHandlerRepository(applicationContext); + + // init logpath + if (logPath!=null && logPath.trim().length()>0) { + XxlJobFileAppender.logPath = logPath; + } + + // init executor-server + initExecutorServer(); + } + public void destroy(){ + // destory JobThreadRepository + if (JobThreadRepository.size() > 0) { + for (Map.Entry item: JobThreadRepository.entrySet()) { + removeJobThread(item.getKey(), "Web容器销毁终止"); + } + JobThreadRepository.clear(); + } + + // destory executor-server + stopExecutorServer(); + } + + // ---------------------------------- admin-client ------------------------------------ private static List adminBizList; private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception { @@ -76,49 +115,29 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe return adminBizList; } - // ---------------------------------- job server ------------------------------------ - private NetComServerFactory serverFactory = new NetComServerFactory(); - public void start() throws Exception { - // init admin-client - initAdminBizList(adminAddresses, accessToken); - // executor start + // ---------------------------------- executor-server ------------------------------------ + private NetComServerFactory serverFactory = new NetComServerFactory(); + private void initExecutorServer() throws Exception { NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty NetComServerFactory.setAccessToken(accessToken); - serverFactory.start(port, ip, appName); - - - // trigger callback thread start - TriggerCallbackThread.getInstance().start(); + serverFactory.start(port, ip, appName); // jetty + registry } - public void destroy(){ - // 1、executor registry thread stop - ExecutorRegistryThread.getInstance().toStop(); - - // 2、executor stop - serverFactory.destroy(); - - // 3、job thread repository destory - if (JobThreadRepository.size() > 0) { - for (Map.Entry item: JobThreadRepository.entrySet()) { - JobThread jobThread = item.getValue(); - jobThread.toStop("Web容器销毁终止"); - jobThread.interrupt(); - - } - JobThreadRepository.clear(); - } - - // 4、trigger callback thread stop - TriggerCallbackThread.getInstance().toStop(); + private void stopExecutorServer() { + serverFactory.destroy(); // jetty + registry + callback } - // ---------------------------------- init job handler ------------------------------------ - public static ApplicationContext applicationContext; - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - XxlJobExecutor.applicationContext = applicationContext; + // ---------------------------------- job handler repository ------------------------------------ + private static ConcurrentHashMap jobHandlerRepository = new ConcurrentHashMap(); + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ + logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); + return jobHandlerRepository.put(name, jobHandler); + } + public static IJobHandler loadJobHandler(String name){ + return jobHandlerRepository.get(name); + } + private static void initJobHandlerRepository(ApplicationContext applicationContext){ // init job handler action Map serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHander.class); @@ -134,27 +153,10 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe } } } - } - - // ---------------------------------- destory job executor ------------------------------------ - @Override - public void onApplicationEvent(ApplicationEvent applicationEvent) { - if(applicationEvent instanceof ContextClosedEvent){ - // TODO - } } - // ---------------------------------- job handler repository - private static ConcurrentHashMap jobHandlerRepository = new ConcurrentHashMap(); - public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ - logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); - return jobHandlerRepository.put(name, jobHandler); - } - public static IJobHandler loadJobHandler(String name){ - return jobHandlerRepository.get(name); - } - // ---------------------------------- job thread repository + // ---------------------------------- job thread repository ------------------------------------ private static ConcurrentHashMap JobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java index e16ccf0a..ba2ba3f1 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java @@ -52,21 +52,21 @@ public class GlueFactory { try { Resource resource = AnnotationUtils.getAnnotation(field, Resource.class); if (resource.name()!=null && resource.name().length()>0){ - fieldBean = XxlJobExecutor.applicationContext.getBean(resource.name()); + fieldBean = XxlJobExecutor.getApplicationContext().getBean(resource.name()); } else { - fieldBean = XxlJobExecutor.applicationContext.getBean(field.getName()); + fieldBean = XxlJobExecutor.getApplicationContext().getBean(field.getName()); } } catch (Exception e) { } if (fieldBean==null ) { - fieldBean = XxlJobExecutor.applicationContext.getBean(field.getType()); + fieldBean = XxlJobExecutor.getApplicationContext().getBean(field.getType()); } } else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) { Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class); if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) { - fieldBean = XxlJobExecutor.applicationContext.getBean(qualifier.value()); + fieldBean = XxlJobExecutor.getApplicationContext().getBean(qualifier.value()); } else { - fieldBean = XxlJobExecutor.applicationContext.getBean(field.getType()); + fieldBean = XxlJobExecutor.getApplicationContext().getBean(field.getType()); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java index 362e3c05..60dc82a8 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java @@ -1,7 +1,6 @@ package com.xxl.job.core.handler.impl; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.glue.GlueTypeEnum; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; @@ -37,17 +36,17 @@ public class ScriptJobHandler extends IJobHandler { String scriptFileName = null; if (GlueTypeEnum.GLUE_SHELL == glueType) { cmd = "bash"; - scriptFileName = XxlJobExecutor.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".sh"); + scriptFileName = XxlJobFileAppender.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".sh"); } else if (GlueTypeEnum.GLUE_PYTHON == glueType) { cmd = "python"; - scriptFileName = XxlJobExecutor.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".py"); + scriptFileName = XxlJobFileAppender.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".py"); } // make script file ScriptUtil.markScriptFile(scriptFileName, gluesource); // log file - String logFileName = XxlJobExecutor.logPath.concat(XxlJobFileAppender.contextHolder.get()); + String logFileName = XxlJobFileAppender.logPath.concat(XxlJobFileAppender.contextHolder.get()); // invoke XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------"); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index de429c38..bd210256 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -1,7 +1,6 @@ package com.xxl.job.core.log; import com.xxl.job.core.biz.model.LogResult; -import com.xxl.job.core.executor.XxlJobExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +19,8 @@ public class XxlJobFileAppender { //public static ThreadLocal contextHolder = new ThreadLocal(); public static final InheritableThreadLocal contextHolder = new InheritableThreadLocal(); public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - + public static String logPath = "/data/applogs/xxl-job/jobhandler/"; + /** * log filename: yyyy-MM-dd/9999.log * @@ -31,7 +31,7 @@ public class XxlJobFileAppender { public static String makeLogFileName(Date triggerDate, int logId) { // filePath/ - File filePathDir = new File(XxlJobExecutor.logPath); + File filePathDir = new File(logPath); if (!filePathDir.exists()) { filePathDir.mkdirs(); } @@ -66,7 +66,7 @@ public class XxlJobFileAppender { if (logFileName==null || logFileName.trim().length()==0) { return; } - File logFile = new File(XxlJobExecutor.logPath, logFileName); + File logFile = new File(logPath, logFileName); if (!logFile.exists()) { try { @@ -111,7 +111,7 @@ public class XxlJobFileAppender { if (logFileName==null || logFileName.trim().length()==0) { return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true); } - File logFile = new File(XxlJobExecutor.logPath, logFileName); + File logFile = new File(logPath, logFileName); if (!logFile.exists()) { return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java index 74a4c955..0b9621bc 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java @@ -1,6 +1,7 @@ package com.xxl.job.core.rpc.netcom.jetty.server; import com.xxl.job.core.thread.ExecutorRegistryThread; +import com.xxl.job.core.thread.TriggerCallbackThread; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -38,10 +39,16 @@ public class JettyServer { server.setHandler(handlerc); try { - // Start the server + // Start server server.start(); logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port); + + // Start Registry-Server ExecutorRegistryThread.getInstance().start(port, ip, appName); + + // Start Callback-Server + TriggerCallbackThread.getInstance().start(); + server.join(); // block until thread stopped logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port); } catch (Exception e) { @@ -56,6 +63,8 @@ public class JettyServer { } public void destroy() { + + // destroy server if (server != null) { try { server.stop(); @@ -67,6 +76,13 @@ public class JettyServer { if (thread.isAlive()) { thread.interrupt(); } + + // destroy Registry-Server + ExecutorRegistryThread.getInstance().toStop(); + + // destroy Callback-Server + TriggerCallbackThread.getInstance().toStop(); + logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName()); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java index 9d352587..95c199d2 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/ScriptUtil.java @@ -1,6 +1,6 @@ package com.xxl.job.core.util; -import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.log.XxlJobFileAppender; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.PumpStreamHandler; @@ -28,7 +28,7 @@ public class ScriptUtil { */ public static void markScriptFile(String scriptFileName, String content) throws IOException { // filePath/ - File filePathDir = new File(XxlJobExecutor.logPath); + File filePathDir = new File(XxlJobFileAppender.logPath); if (!filePathDir.exists()) { filePathDir.mkdirs(); }