GLUE模式任务实例更新逻辑优化,原根据超时时间更新改为根据版本号更新,源码变动版本号加一;

pull/1/head
xueli.xue 8 years ago
parent f2d2422848
commit 3ccf3ad5ab

@ -60,6 +60,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true); triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setLogId(jobLog.getId()); triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setLogAddress(findCallbackAddressList()); // callback address list triggerParam.setLogAddress(findCallbackAddressList()); // callback address list

@ -9,6 +9,8 @@ import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.impl.GlueJobHandler; import com.xxl.job.core.handler.impl.GlueJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.JobThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date; import java.util.Date;
@ -16,6 +18,7 @@ import java.util.Date;
* Created by xuxueli on 17/3/1. * Created by xuxueli on 17/3/1.
*/ */
public class ExecutorBizImpl implements ExecutorBiz { public class ExecutorBizImpl implements ExecutorBiz {
private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class);
@Override @Override
public ReturnT<String> beat() { public ReturnT<String> beat() {
@ -55,25 +58,26 @@ public class ExecutorBizImpl implements ExecutorBiz {
if (!triggerParam.isGlueSwitch()) { if (!triggerParam.isGlueSwitch()) {
// bean model // bean model
// valid handler instance // valid handler
IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobHandler==null) { if (jobHandler==null) {
return new ReturnT(ReturnT.FAIL_CODE, "job handler for JobId=[" + triggerParam.getJobId() + "] not found."); return new ReturnT(ReturnT.FAIL_CODE, "job handler for JobId=[" + triggerParam.getJobId() + "] not found.");
} }
if (jobThread == null) { // valid exists job threadchange handler, need kill old thread
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler); if (jobThread != null && jobThread.getHandler() != jobHandler) {
} else {
// job handler update, kill old job thread
if (jobThread.getHandler() != jobHandler) {
// kill old job thread // kill old job thread
jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
jobThread.interrupt(); jobThread.interrupt();
XxlJobExecutor.removeJobThread(triggerParam.getJobId());
jobThread = null;
}
// new thread, with new job handler // make thread: new or exists invalid
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler); jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler);
} }
}
} else { } else {
// glue model // glue model
@ -82,19 +86,29 @@ public class ExecutorBizImpl implements ExecutorBiz {
return new ReturnT(ReturnT.FAIL_CODE, "glueLoader for JobId=[" + triggerParam.getJobId() + "] not found."); return new ReturnT(ReturnT.FAIL_CODE, "glueLoader for JobId=[" + triggerParam.getJobId() + "] not found.");
} }
if (jobThread == null) { // valid exists job threadchange handler or glue timeout, need kill old thread
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(triggerParam.getJobId())); if (jobThread != null &&
} else { !(jobThread.getHandler() instanceof GlueJobHandler
// job handler update, kill old job thread && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
if (!(jobThread.getHandler() instanceof GlueJobHandler)) { // change glue model or glue timeout, kill old job thread
// kill old job thread
jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
jobThread.interrupt(); jobThread.interrupt();
XxlJobExecutor.removeJobThread(triggerParam.getJobId());
jobThread = null;
}
// new thread, with new job handler // make thread: new or exists invalid
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(triggerParam.getJobId())); if (jobThread == null) {
IJobHandler jobHandler = null;
try {
jobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getJobId());
} catch (Exception e) {
logger.error("", e);
return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
} }
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(jobHandler, triggerParam.getGlueUpdatetime()));
} }
} }
// push data to queue // push data to queue

@ -15,6 +15,7 @@ public class TriggerParam implements Serializable{
private String executorParams; private String executorParams;
private boolean glueSwitch; private boolean glueSwitch;
private long glueUpdatetime;
private int logId; private int logId;
private long logDateTim; private long logDateTim;
@ -53,6 +54,14 @@ public class TriggerParam implements Serializable{
this.glueSwitch = glueSwitch; this.glueSwitch = glueSwitch;
} }
public long getGlueUpdatetime() {
return glueUpdatetime;
}
public void setGlueUpdatetime(long glueUpdatetime) {
this.glueUpdatetime = glueUpdatetime;
}
public int getLogId() { public int getLogId() {
return logId; return logId;
} }

@ -1,6 +1,5 @@
package com.xxl.job.core.glue; package com.xxl.job.core.glue;
import com.xxl.job.core.glue.cache.LocalCache;
import com.xxl.job.core.glue.loader.GlueLoader; import com.xxl.job.core.glue.loader.GlueLoader;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import groovy.lang.GroovyClassLoader; import groovy.lang.GroovyClassLoader;
@ -29,14 +28,6 @@ public class GlueFactory implements ApplicationContextAware {
*/ */
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader(); private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
/**
* glue cache timeout / second
*/
private long cacheTimeout = 5000;
public void setCacheTimeout(long cacheTimeout) {
this.cacheTimeout = cacheTimeout;
}
/** /**
* code source loader * code source loader
*/ */
@ -51,6 +42,9 @@ public class GlueFactory implements ApplicationContextAware {
// ----------------------------- spring support ----------------------------- // ----------------------------- spring support -----------------------------
private static ApplicationContext applicationContext; private static ApplicationContext applicationContext;
private static GlueFactory glueFactory; private static GlueFactory glueFactory;
public static GlueFactory getInstance(){
return glueFactory;
}
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@ -135,40 +129,4 @@ public class GlueFactory implements ApplicationContextAware {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null"); throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
} }
// // load instance, singleton
private static String generateInstanceCacheKey(int jobId){
return String.valueOf(jobId).concat("_instance");
}
public IJobHandler loadInstance(int jobId) throws Exception{
if (jobId==0) {
return null;
}
String cacheInstanceKey = generateInstanceCacheKey(jobId);
Object cacheInstance = LocalCache.getInstance().get(cacheInstanceKey);
if (cacheInstance!=null) {
if (!(cacheInstance instanceof IJobHandler)) {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, "
+ "cannot convert from cacheClass["+ cacheInstance.getClass() +"] to IJobHandler");
}
return (IJobHandler) cacheInstance;
}
Object instance = loadNewInstance(jobId);
if (instance!=null) {
if (!(instance instanceof IJobHandler)) {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
LocalCache.getInstance().set(cacheInstanceKey, instance, cacheTimeout);
logger.info(">>>>>>>>>>>> xxl-glue, fresh instance, cacheInstanceKey:{}", cacheInstanceKey);
return (IJobHandler) instance;
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, instance is null");
}
// ----------------------------- util -----------------------------
public static void glue(int jobId, String... params) throws Exception{
GlueFactory.glueFactory.loadInstance(jobId).execute(params);
}
} }

@ -1,17 +0,0 @@
package com.xxl.job.core.glue.cache;
/**
* chche interface
* @author xuxueli 2016-1-8 15:57:27
*/
public interface ICache {
public boolean set(String key, Object value);
public boolean set(String key, Object value, long timeout);
public Object get(String key);
public boolean remove(String key);
}

@ -1,71 +0,0 @@
package com.xxl.job.core.glue.cache;
import java.util.concurrent.ConcurrentHashMap;
/**
* local interface
* @author Administrator
*/
public class LocalCache implements ICache{
private static final LocalCache instance = new LocalCache();
public static LocalCache getInstance(){
return instance;
}
private static final ConcurrentHashMap<String, Object> cacheMap = new ConcurrentHashMap<String, Object>();
private static final long CACHE_TIMEOUT = 5000;
private static String makeTimKey(String key){
return key.concat("_tim");
}
private static String makeDataKey(String key){
return key.concat("_data");
}
@Override
public boolean set(String key, Object value) {
cacheMap.put(makeTimKey(key), System.currentTimeMillis() + CACHE_TIMEOUT);
cacheMap.put(makeDataKey(key), value);
return true;
}
@Override
public boolean set(String key, Object value, long timeout) {
cacheMap.put(makeTimKey(key), System.currentTimeMillis() + timeout);
cacheMap.put(makeDataKey(key), value);
return true;
}
@Override
public Object get(String key) {
Object tim = cacheMap.get(makeTimKey(key));
if (tim != null && System.currentTimeMillis() < Long.parseLong(tim.toString())) {
return cacheMap.get(makeDataKey(key));
}
return null;
}
@Override
public boolean remove(String key) {
cacheMap.remove(makeTimKey(key));
cacheMap.remove(makeDataKey(key));
return true;
}
public static void main(String[] args) {
String key = "key01";
System.out.println(LocalCache.getInstance().get(key));
LocalCache.getInstance().set(key, "v1");
System.out.println(LocalCache.getInstance().get(key));
LocalCache.getInstance().set(key, "v2");
System.out.println(LocalCache.getInstance().get(key));
LocalCache.getInstance().remove(key);
System.out.println(LocalCache.getInstance().get(key));
}
}

@ -1,22 +1,30 @@
package com.xxl.job.core.handler.impl; package com.xxl.job.core.handler.impl;
import com.xxl.job.core.glue.GlueFactory;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* glue job handler * glue job handler
* @author xuxueli 2016-5-19 21:05:45 * @author xuxueli 2016-5-19 21:05:45
*/ */
public class GlueJobHandler extends IJobHandler { public class GlueJobHandler extends IJobHandler {
private static Logger logger = LoggerFactory.getLogger(GlueJobHandler.class);
private int jobId; private long glueUpdatetime;
public GlueJobHandler(int jobId) { private IJobHandler jobHandler;
this.jobId = jobId; public GlueJobHandler(IJobHandler jobHandler, long glueUpdatetime) {
this.jobHandler = jobHandler;
this.glueUpdatetime = glueUpdatetime;
}
public long getGlueUpdatetime() {
return glueUpdatetime;
} }
@Override @Override
public void execute(String... params) throws Exception { public void execute(String... params) throws Exception {
GlueFactory.glue(jobId, params); logger.info("----------- glue.version:{} -----------", glueUpdatetime);
jobHandler.execute(params);
} }
} }

@ -28,7 +28,7 @@ public class DemoJobHandler extends IJobHandler {
public void execute(String... params) throws Exception { public void execute(String... params) throws Exception {
logger.info("XXL-JOB, Hello World."); logger.info("XXL-JOB, Hello World.");
for (int i = 0; i < 2; i++) { for (int i = 0; i < 5; i++) {
logger.info("beat at:{}", i); logger.info("beat at:{}", i);
TimeUnit.SECONDS.sleep(2); TimeUnit.SECONDS.sleep(2);
} }

@ -42,8 +42,6 @@
<!-- 配置03、GlueFactory --> <!-- 配置03、GlueFactory -->
<bean id="glueFactory" class="com.xxl.job.core.glue.GlueFactory"> <bean id="glueFactory" class="com.xxl.job.core.glue.GlueFactory">
<!-- GLUE任务示例缓存失效时间, 单位/ms -->
<property name="cacheTimeout" value="${xxl.job.glue.cache.time}" />
<!-- GLUE源码加载器默认使用系统提供的 "DbGlueLoader", 推荐将其改为公共的RPC服务 --> <!-- GLUE源码加载器默认使用系统提供的 "DbGlueLoader", 推荐将其改为公共的RPC服务 -->
<property name="glueLoader" > <property name="glueLoader" >
<!-- DbGlueLoader, 依赖 "XXL-JOB公共数据源" --> <!-- DbGlueLoader, 依赖 "XXL-JOB公共数据源" -->

@ -8,6 +8,3 @@ xxl.job.db.password=root_pwd
xxl.job.executor.appname=xxl-job-executor-example xxl.job.executor.appname=xxl-job-executor-example
xxl.job.executor.ip= xxl.job.executor.ip=
xxl.job.executor.port=9999 xxl.job.executor.port=9999
### xxl-job glue cache time/ms
xxl.job.glue.cache.time=10000

Loading…
Cancel
Save