pull/56/head
liyong 4 years ago
parent 1a4c8e5e86
commit a61ba7f3a7

@ -46,6 +46,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
@Override @Override
public ReturnT<String> run(TriggerParam triggerParam) { public ReturnT<String> run(TriggerParam triggerParam) {
// load oldjobHandler + jobThread // load oldjobHandler + jobThread
//优先获取缓存中的处理器 (可以自定义设置)
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null; String removeOldReason = null;
@ -54,7 +55,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) { if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler // new jobhandler 通过解析对象时包装的map获取
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread // valid old jobThread
@ -120,13 +121,16 @@ public class ExecutorBizImpl implements ExecutorBiz {
// executor block strategy // executor block strategy
if (jobThread != null) { if (jobThread != null) {
//阻塞处理策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
//丢弃后续调用
// discard when running // discard when running
if (jobThread.isRunningOrHasQueue()) { if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
} }
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
//覆盖之前调用
// kill running jobThread // kill running jobThread
if (jobThread.isRunningOrHasQueue()) { if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); removeOldReason = "block strategy effect" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
@ -134,12 +138,14 @@ public class ExecutorBizImpl implements ExecutorBiz {
jobThread = null; jobThread = null;
} }
} else { } else {
//单机串行
// just queue trigger // just queue trigger
} }
} }
// replace thread (new or exists invalid) // replace thread (new or exists invalid)
if (jobThread == null) { if (jobThread == null) {
//开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
} }

@ -74,7 +74,7 @@ public class XxlJobExecutor {
initAdminBizList(adminAddresses, accessToken); initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread // init JobLogFileCleanThread 开启一个线程清理日志
JobLogFileCleanThread.getInstance().start(logRetentionDays); JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread // init TriggerCallbackThread
@ -116,6 +116,13 @@ public class XxlJobExecutor {
// ---------------------- admin-client (rpc invoker) ---------------------- // ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList; private static List<AdminBiz> adminBizList;
/**
* xxl AdminBizClient adminBizList ,
* @param adminAddresses
* @param accessToken
* @throws Exception
*/
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) { if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) { for (String address: adminAddresses.trim().split(",")) {
@ -206,7 +213,7 @@ public class XxlJobExecutor {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" ."); "The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/ }*/
//TODO 设置访问权限 说明私有的方法也是支持的 demo待实践
executeMethod.setAccessible(true); executeMethod.setAccessible(true);
// init and destroy // init and destroy
@ -215,6 +222,7 @@ public class XxlJobExecutor {
if (xxlJob.init().trim().length() > 0) { if (xxlJob.init().trim().length() > 0) {
try { try {
//获取初始化方法
initMethod = clazz.getDeclaredMethod(xxlJob.init()); initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true); initMethod.setAccessible(true);
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
@ -231,6 +239,7 @@ public class XxlJobExecutor {
} }
// registry jobhandler // registry jobhandler
//注册到map中
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
} }
@ -239,6 +248,7 @@ public class XxlJobExecutor {
// ---------------------- job thread repository ---------------------- // ---------------------- job thread repository ----------------------
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
//TODO 处理业务
JobThread newJobThread = new JobThread(jobId, handler); JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start(); newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

@ -28,6 +28,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
// start // start
/**
*
*/
@Override @Override
public void afterSingletonsInstantiated() { public void afterSingletonsInstantiated() {
@ -37,7 +41,7 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
// init JobHandler Repository (for method) // init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext); initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory //TODO 初始化 glueFactory 使用场景待定 应该是执行脚本用的
GlueFactory.refreshInstance(1); GlueFactory.refreshInstance(1);
// super start // super start
@ -77,6 +81,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
} }
}*/ }*/
/**
* XxlJob XxlJobExecutor.jobHandlerRepository(map)
* @param applicationContext
*/
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) { if (applicationContext == null) {
return; return;

@ -30,6 +30,11 @@ public class XxlJobFileAppender {
*/ */
private static String logBasePath = "/data/applogs/xxl-job/jobhandler"; private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource"); private static String glueSrcPath = logBasePath.concat("/gluesource");
/**
* glue
* @param logPath
*/
public static void initLogPath(String logPath){ public static void initLogPath(String logPath){
// init // init
if (logPath!=null && logPath.trim().length()>0) { if (logPath!=null && logPath.trim().length()>0) {

@ -34,7 +34,7 @@ public class ExecutorRegistryThread {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return; return;
} }
//开启线程往服务器注册 每隔30秒请求一次
registryThread = new Thread(new Runnable() { registryThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -76,7 +76,7 @@ public class ExecutorRegistryThread {
} }
} }
// registry remove // registry remove 如果通过stop 中断了 注册过程 则会立刻调用 移除接口
try { try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

@ -27,9 +27,14 @@ public class JobLogFileCleanThread {
private Thread localThread; private Thread localThread;
private volatile boolean toStop = false; private volatile boolean toStop = false;
/**
* sleep interrupt
* @param logRetentionDays
*/
public void start(final long logRetentionDays){ public void start(final long logRetentionDays){
// limit min value // limit min value 最小值三天一清理
if (logRetentionDays < 3 ) { if (logRetentionDays < 3 ) {
return; return;
} }
@ -54,7 +59,7 @@ public class JobLogFileCleanThread {
for (File childFile: childDirs) { for (File childFile: childDirs) {
// valid // valid 生成的日志文件时日期格式的文件夹
if (!childFile.isDirectory()) { if (!childFile.isDirectory()) {
continue; continue;
} }
@ -73,7 +78,7 @@ public class JobLogFileCleanThread {
if (logFileCreateDate == null) { if (logFileCreateDate == null) {
continue; continue;
} }
//基于日期校验是不是指定日期之前的文件夹 是则删除
if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
FileUtil.deleteRecursively(childFile); FileUtil.deleteRecursively(childFile);
} }

@ -98,8 +98,10 @@ public class JobThread extends Thread{
// init // init
try { try {
//调用 xxljob注解的 init方法
handler.init(); handler.init();
} catch (Throwable e) { } catch (Throwable e) {
//TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
@ -126,12 +128,13 @@ public class JobThread extends Thread{
triggerParam.getBroadcastIndex(), triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal()); triggerParam.getBroadcastTotal());
// init job context // init job context 包装执行的一些参数 通过 threadlocal 存取
XxlJobContext.setXxlJobContext(xxlJobContext); XxlJobContext.setXxlJobContext(xxlJobContext);
// execute // execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()); XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
//执行超时时间
if (triggerParam.getExecutorTimeout() > 0) { if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout // limit timeout
Thread futureThread = null; Thread futureThread = null;
@ -162,6 +165,7 @@ public class JobThread extends Thread{
futureThread.interrupt(); futureThread.interrupt();
} }
} else { } else {
//如果没有执行超时时间的限制 则 直接调用业务方法
// just execute // just execute
handler.execute(); handler.execute();
} }
@ -183,6 +187,7 @@ public class JobThread extends Thread{
); );
} else { } else {
//每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务
if (idleTimes > 30) { if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
@ -227,6 +232,7 @@ public class JobThread extends Thread{
} }
// callback trigger request in queue // callback trigger request in queue
//TODO 待确认
while(triggerQueue !=null && triggerQueue.size()>0){ while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll(); TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) { if (triggerParam!=null) {

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* Created by xuxueli on 16/7/22. * Created by xuxueli on 16/7/22.
*/ */
public class TriggerCallbackThread { public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
@ -43,6 +44,10 @@ public class TriggerCallbackThread {
/** /**
* callback thread * callback thread
*/ */
/** TODO codemsg callback
* xxl/api/callback
* log triggerRetryCallbackThread 线30()
*/
private Thread triggerCallbackThread; private Thread triggerCallbackThread;
private Thread triggerRetryCallbackThread; private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false; private volatile boolean toStop = false;
@ -68,6 +73,7 @@ public class TriggerCallbackThread {
// callback list param // callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
//将队列中所有对象全部取出
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback); callbackParamList.add(callback);
@ -163,6 +169,7 @@ public class TriggerCallbackThread {
private void doCallback(List<HandleCallbackParam> callbackParamList){ private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false; boolean callbackRet = false;
// callback, will retry if error // callback, will retry if error
//getAdminBizList 所有xxl服务器地址
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try { try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);

@ -16,7 +16,7 @@ xxl.job.accessToken=
### xxl-job executor appname ### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address= xxl.job.executor.address=1.1.1.1
### xxl-job executor server-info ### xxl-job executor server-info
xxl.job.executor.ip= xxl.job.executor.ip=
xxl.job.executor.port=9999 xxl.job.executor.port=9999

Loading…
Cancel
Save