执行器参数配置逻辑重构

v1.8.2
xuxueli 7 years ago
parent 1bc5cc76be
commit 09d8952448

@ -5,19 +5,15 @@ import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander; import com.xxl.job.core.handler.annotation.JobHander;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.rpc.netcom.NetComClientProxy; import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.rpc.netcom.NetComServerFactory; import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -27,15 +23,16 @@ import java.util.concurrent.ConcurrentHashMap;
/** /**
* Created by xuxueli on 2016/3/2 21:14. * Created by xuxueli on 2016/3/2 21:14.
*/ */
public class XxlJobExecutor implements ApplicationContextAware, ApplicationListener { public class XxlJobExecutor implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
// ---------------------------------- param ------------------------------------
private String ip; private String ip;
private int port = 9999; private int port = 9999;
private String appName; private String appName;
private String adminAddresses; private String adminAddresses;
private String accessToken; private String accessToken;
public static String logPath = "/data/applogs/xxl-job/jobhandler/"; private String logPath;
public void setIp(String ip) { public void setIp(String ip) {
this.ip = ip; this.ip = ip;
@ -56,6 +53,48 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
this.logPath = logPath; this.logPath = logPath;
} }
// ---------------------------------- applicationContext ------------------------------------
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
// ---------------------------------- start + stop ------------------------------------
public void start() throws Exception {
// init admin-client
initAdminBizList(adminAddresses, accessToken);
// init executor-jobHandlerRepository
initJobHandlerRepository(applicationContext);
// init logpath
if (logPath!=null && logPath.trim().length()>0) {
XxlJobFileAppender.logPath = logPath;
}
// init executor-server
initExecutorServer();
}
public void destroy(){
// destory JobThreadRepository
if (JobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item: JobThreadRepository.entrySet()) {
removeJobThread(item.getKey(), "Web容器销毁终止");
}
JobThreadRepository.clear();
}
// destory executor-server
stopExecutorServer();
}
// ---------------------------------- admin-client ------------------------------------ // ---------------------------------- admin-client ------------------------------------
private static List<AdminBiz> adminBizList; private static List<AdminBiz> adminBizList;
private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception { private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
@ -76,49 +115,29 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
return adminBizList; return adminBizList;
} }
// ---------------------------------- job server ------------------------------------
private NetComServerFactory serverFactory = new NetComServerFactory();
public void start() throws Exception {
// init admin-client
initAdminBizList(adminAddresses, accessToken);
// executor start // ---------------------------------- executor-server ------------------------------------
private NetComServerFactory serverFactory = new NetComServerFactory();
private void initExecutorServer() throws Exception {
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty
NetComServerFactory.setAccessToken(accessToken); NetComServerFactory.setAccessToken(accessToken);
serverFactory.start(port, ip, appName); serverFactory.start(port, ip, appName); // jetty + registry
}
private void stopExecutorServer() {
// trigger callback thread start serverFactory.destroy(); // jetty + registry + callback
TriggerCallbackThread.getInstance().start();
} }
public void destroy(){
// 1、executor registry thread stop
ExecutorRegistryThread.getInstance().toStop();
// 2、executor stop
serverFactory.destroy();
// 3、job thread repository destory
if (JobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item: JobThreadRepository.entrySet()) {
JobThread jobThread = item.getValue();
jobThread.toStop("Web容器销毁终止");
jobThread.interrupt();
// ---------------------------------- job handler repository ------------------------------------
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
} }
JobThreadRepository.clear(); public static IJobHandler loadJobHandler(String name){
} return jobHandlerRepository.get(name);
// 4、trigger callback thread stop
TriggerCallbackThread.getInstance().toStop();
} }
private static void initJobHandlerRepository(ApplicationContext applicationContext){
// ---------------------------------- init job handler ------------------------------------
public static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
XxlJobExecutor.applicationContext = applicationContext;
// init job handler action // init job handler action
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHander.class); Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHander.class);
@ -136,25 +155,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
} }
} }
// ---------------------------------- destory job executor ------------------------------------
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
if(applicationEvent instanceof ContextClosedEvent){
// TODO
}
}
// ---------------------------------- job handler repository
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
// ---------------------------------- job thread repository // ---------------------------------- job thread repository ------------------------------------
private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler); JobThread newJobThread = new JobThread(jobId, handler);

@ -52,21 +52,21 @@ public class GlueFactory {
try { try {
Resource resource = AnnotationUtils.getAnnotation(field, Resource.class); Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
if (resource.name()!=null && resource.name().length()>0){ if (resource.name()!=null && resource.name().length()>0){
fieldBean = XxlJobExecutor.applicationContext.getBean(resource.name()); fieldBean = XxlJobExecutor.getApplicationContext().getBean(resource.name());
} else { } else {
fieldBean = XxlJobExecutor.applicationContext.getBean(field.getName()); fieldBean = XxlJobExecutor.getApplicationContext().getBean(field.getName());
} }
} catch (Exception e) { } catch (Exception e) {
} }
if (fieldBean==null ) { if (fieldBean==null ) {
fieldBean = XxlJobExecutor.applicationContext.getBean(field.getType()); fieldBean = XxlJobExecutor.getApplicationContext().getBean(field.getType());
} }
} else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) { } else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class); Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) { if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
fieldBean = XxlJobExecutor.applicationContext.getBean(qualifier.value()); fieldBean = XxlJobExecutor.getApplicationContext().getBean(qualifier.value());
} else { } else {
fieldBean = XxlJobExecutor.applicationContext.getBean(field.getType()); fieldBean = XxlJobExecutor.getApplicationContext().getBean(field.getType());
} }
} }

@ -1,7 +1,6 @@
package com.xxl.job.core.handler.impl; package com.xxl.job.core.handler.impl;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.glue.GlueTypeEnum; import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobFileAppender;
@ -37,17 +36,17 @@ public class ScriptJobHandler extends IJobHandler {
String scriptFileName = null; String scriptFileName = null;
if (GlueTypeEnum.GLUE_SHELL == glueType) { if (GlueTypeEnum.GLUE_SHELL == glueType) {
cmd = "bash"; cmd = "bash";
scriptFileName = XxlJobExecutor.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".sh"); scriptFileName = XxlJobFileAppender.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".sh");
} else if (GlueTypeEnum.GLUE_PYTHON == glueType) { } else if (GlueTypeEnum.GLUE_PYTHON == glueType) {
cmd = "python"; cmd = "python";
scriptFileName = XxlJobExecutor.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".py"); scriptFileName = XxlJobFileAppender.logPath.concat("gluesource/").concat(String.valueOf(jobId)).concat("_").concat(String.valueOf(glueUpdatetime)).concat(".py");
} }
// make script file // make script file
ScriptUtil.markScriptFile(scriptFileName, gluesource); ScriptUtil.markScriptFile(scriptFileName, gluesource);
// log file // log file
String logFileName = XxlJobExecutor.logPath.concat(XxlJobFileAppender.contextHolder.get()); String logFileName = XxlJobFileAppender.logPath.concat(XxlJobFileAppender.contextHolder.get());
// invoke // invoke
XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------"); XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------");

@ -1,7 +1,6 @@
package com.xxl.job.core.log; package com.xxl.job.core.log;
import com.xxl.job.core.biz.model.LogResult; import com.xxl.job.core.biz.model.LogResult;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -20,6 +19,7 @@ public class XxlJobFileAppender {
//public static ThreadLocal<String> contextHolder = new ThreadLocal<String>(); //public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>(); public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>();
public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
public static String logPath = "/data/applogs/xxl-job/jobhandler/";
/** /**
* log filename: yyyy-MM-dd/9999.log * log filename: yyyy-MM-dd/9999.log
@ -31,7 +31,7 @@ public class XxlJobFileAppender {
public static String makeLogFileName(Date triggerDate, int logId) { public static String makeLogFileName(Date triggerDate, int logId) {
// filePath/ // filePath/
File filePathDir = new File(XxlJobExecutor.logPath); File filePathDir = new File(logPath);
if (!filePathDir.exists()) { if (!filePathDir.exists()) {
filePathDir.mkdirs(); filePathDir.mkdirs();
} }
@ -66,7 +66,7 @@ public class XxlJobFileAppender {
if (logFileName==null || logFileName.trim().length()==0) { if (logFileName==null || logFileName.trim().length()==0) {
return; return;
} }
File logFile = new File(XxlJobExecutor.logPath, logFileName); File logFile = new File(logPath, logFileName);
if (!logFile.exists()) { if (!logFile.exists()) {
try { try {
@ -111,7 +111,7 @@ public class XxlJobFileAppender {
if (logFileName==null || logFileName.trim().length()==0) { if (logFileName==null || logFileName.trim().length()==0) {
return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true); return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
} }
File logFile = new File(XxlJobExecutor.logPath, logFileName); File logFile = new File(logPath, logFileName);
if (!logFile.exists()) { if (!logFile.exists()) {
return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true); return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);

@ -1,6 +1,7 @@
package com.xxl.job.core.rpc.netcom.jetty.server; package com.xxl.job.core.rpc.netcom.jetty.server;
import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -38,10 +39,16 @@ public class JettyServer {
server.setHandler(handlerc); server.setHandler(handlerc);
try { try {
// Start the server // Start server
server.start(); server.start();
logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port); logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
// Start Registry-Server
ExecutorRegistryThread.getInstance().start(port, ip, appName); ExecutorRegistryThread.getInstance().start(port, ip, appName);
// Start Callback-Server
TriggerCallbackThread.getInstance().start();
server.join(); // block until thread stopped server.join(); // block until thread stopped
logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port); logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
} catch (Exception e) { } catch (Exception e) {
@ -56,6 +63,8 @@ public class JettyServer {
} }
public void destroy() { public void destroy() {
// destroy server
if (server != null) { if (server != null) {
try { try {
server.stop(); server.stop();
@ -67,6 +76,13 @@ public class JettyServer {
if (thread.isAlive()) { if (thread.isAlive()) {
thread.interrupt(); thread.interrupt();
} }
// destroy Registry-Server
ExecutorRegistryThread.getInstance().toStop();
// destroy Callback-Server
TriggerCallbackThread.getInstance().toStop();
logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName()); logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName());
} }

@ -1,6 +1,6 @@
package com.xxl.job.core.util; package com.xxl.job.core.util;
import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.log.XxlJobFileAppender;
import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.PumpStreamHandler;
@ -28,7 +28,7 @@ public class ScriptUtil {
*/ */
public static void markScriptFile(String scriptFileName, String content) throws IOException { public static void markScriptFile(String scriptFileName, String content) throws IOException {
// filePath/ // filePath/
File filePathDir = new File(XxlJobExecutor.logPath); File filePathDir = new File(XxlJobFileAppender.logPath);
if (!filePathDir.exists()) { if (!filePathDir.exists()) {
filePathDir.mkdirs(); filePathDir.mkdirs();
} }

Loading…
Cancel
Save