From 3ccf3ad5ab2c7fea4b7738251a6183a4747acd11 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Mon, 13 Mar 2017 15:05:58 +0800 Subject: [PATCH] =?UTF-8?q?GLUE=E6=A8=A1=E5=BC=8F=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=AE=9E=E4=BE=8B=E6=9B=B4=E6=96=B0=E9=80=BB=E8=BE=91=E4=BC=98?= =?UTF-8?q?=E5=8C=96=EF=BC=8C=E5=8E=9F=E6=A0=B9=E6=8D=AE=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E6=9B=B4=E6=96=B0=E6=94=B9=E4=B8=BA=E6=A0=B9?= =?UTF-8?q?=E6=8D=AE=E7=89=88=E6=9C=AC=E5=8F=B7=E6=9B=B4=E6=96=B0=EF=BC=8C?= =?UTF-8?q?=E6=BA=90=E7=A0=81=E5=8F=98=E5=8A=A8=E7=89=88=E6=9C=AC=E5=8F=B7?= =?UTF-8?q?=E5=8A=A0=E4=B8=80=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../admin/core/jobbean/RemoteHttpJobBean.java | 1 + .../job/core/biz/impl/ExecutorBizImpl.java | 56 +++++++++------ .../xxl/job/core/biz/model/TriggerParam.java | 9 +++ .../com/xxl/job/core/glue/GlueFactory.java | 50 ++----------- .../com/xxl/job/core/glue/cache/ICache.java | 17 ----- .../xxl/job/core/glue/cache/LocalCache.java | 71 ------------------- .../job/core/handler/impl/GlueJobHandler.java | 20 ++++-- .../service/jobhandler/DemoJobHandler.java | 2 +- .../resources/applicationcontext-xxl-job.xml | 2 - .../resources/xxl-job-executor.properties | 5 +- 10 files changed, 65 insertions(+), 168 deletions(-) delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/ICache.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/LocalCache.java diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index 5ed57e74..b98af8a2 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -60,6 +60,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true); + triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); triggerParam.setLogAddress(findCallbackAddressList()); // callback address list diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 43978d39..4d2f3b02 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -9,6 +9,8 @@ import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.impl.GlueJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.thread.JobThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Date; @@ -16,6 +18,7 @@ import java.util.Date; * Created by xuxueli on 17/3/1. */ public class ExecutorBizImpl implements ExecutorBiz { + private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class); @Override public ReturnT beat() { @@ -55,25 +58,26 @@ public class ExecutorBizImpl implements ExecutorBiz { if (!triggerParam.isGlueSwitch()) { // bean model - // valid handler instance + // valid handler IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); if (jobHandler==null) { return new ReturnT(ReturnT.FAIL_CODE, "job handler for JobId=[" + triggerParam.getJobId() + "] not found."); } + // valid exists job thread:change handler, need kill old thread + if (jobThread != null && jobThread.getHandler() != jobHandler) { + // kill old job thread + jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); + jobThread.interrupt(); + XxlJobExecutor.removeJobThread(triggerParam.getJobId()); + jobThread = null; + } + + // make thread: new or exists invalid if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler); - } else { - // job handler update, kill old job thread - if (jobThread.getHandler() != jobHandler) { - // kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - - // new thread, with new job handler - jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler); - } } + } else { // glue model @@ -82,19 +86,29 @@ public class ExecutorBizImpl implements ExecutorBiz { return new ReturnT(ReturnT.FAIL_CODE, "glueLoader for JobId=[" + triggerParam.getJobId() + "] not found."); } + // valid exists job thread:change handler or glue timeout, need kill old thread + if (jobThread != null && + !(jobThread.getHandler() instanceof GlueJobHandler + && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { + // change glue model or glue timeout, kill old job thread + jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); + jobThread.interrupt(); + XxlJobExecutor.removeJobThread(triggerParam.getJobId()); + jobThread = null; + } + + // make thread: new or exists invalid if (jobThread == null) { - jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(triggerParam.getJobId())); - } else { - // job handler update, kill old job thread - if (!(jobThread.getHandler() instanceof GlueJobHandler)) { - // kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - - // new thread, with new job handler - jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(triggerParam.getJobId())); + IJobHandler jobHandler = null; + try { + jobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getJobId()); + } catch (Exception e) { + logger.error("", e); + return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); } + jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(jobHandler, triggerParam.getGlueUpdatetime())); } + } // push data to queue diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java index 0cc17715..bf72ae73 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -15,6 +15,7 @@ public class TriggerParam implements Serializable{ private String executorParams; private boolean glueSwitch; + private long glueUpdatetime; private int logId; private long logDateTim; @@ -53,6 +54,14 @@ public class TriggerParam implements Serializable{ this.glueSwitch = glueSwitch; } + public long getGlueUpdatetime() { + return glueUpdatetime; + } + + public void setGlueUpdatetime(long glueUpdatetime) { + this.glueUpdatetime = glueUpdatetime; + } + public int getLogId() { return logId; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java index 1ebbe304..04da6ce4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/glue/GlueFactory.java @@ -1,6 +1,5 @@ package com.xxl.job.core.glue; -import com.xxl.job.core.glue.cache.LocalCache; import com.xxl.job.core.glue.loader.GlueLoader; import com.xxl.job.core.handler.IJobHandler; import groovy.lang.GroovyClassLoader; @@ -29,14 +28,6 @@ public class GlueFactory implements ApplicationContextAware { */ private GroovyClassLoader groovyClassLoader = new GroovyClassLoader(); - /** - * glue cache timeout / second - */ - private long cacheTimeout = 5000; - public void setCacheTimeout(long cacheTimeout) { - this.cacheTimeout = cacheTimeout; - } - /** * code source loader */ @@ -51,6 +42,9 @@ public class GlueFactory implements ApplicationContextAware { // ----------------------------- spring support ----------------------------- private static ApplicationContext applicationContext; private static GlueFactory glueFactory; + public static GlueFactory getInstance(){ + return glueFactory; + } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { @@ -134,41 +128,5 @@ public class GlueFactory implements ApplicationContextAware { } throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null"); } - - // // load instance, singleton - private static String generateInstanceCacheKey(int jobId){ - return String.valueOf(jobId).concat("_instance"); - } - public IJobHandler loadInstance(int jobId) throws Exception{ - if (jobId==0) { - return null; - } - String cacheInstanceKey = generateInstanceCacheKey(jobId); - Object cacheInstance = LocalCache.getInstance().get(cacheInstanceKey); - if (cacheInstance!=null) { - if (!(cacheInstance instanceof IJobHandler)) { - throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, " - + "cannot convert from cacheClass["+ cacheInstance.getClass() +"] to IJobHandler"); - } - return (IJobHandler) cacheInstance; - } - Object instance = loadNewInstance(jobId); - if (instance!=null) { - if (!(instance instanceof IJobHandler)) { - throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, " - + "cannot convert from instance["+ instance.getClass() +"] to IJobHandler"); - } - - LocalCache.getInstance().set(cacheInstanceKey, instance, cacheTimeout); - logger.info(">>>>>>>>>>>> xxl-glue, fresh instance, cacheInstanceKey:{}", cacheInstanceKey); - return (IJobHandler) instance; - } - throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadInstance error, instance is null"); - } - - // ----------------------------- util ----------------------------- - public static void glue(int jobId, String... params) throws Exception{ - GlueFactory.glueFactory.loadInstance(jobId).execute(params); - } - + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/ICache.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/ICache.java deleted file mode 100644 index 963a67a5..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/ICache.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.xxl.job.core.glue.cache; - -/** - * chche interface - * @author xuxueli 2016-1-8 15:57:27 - */ -public interface ICache { - - public boolean set(String key, Object value); - - public boolean set(String key, Object value, long timeout); - - public Object get(String key); - - public boolean remove(String key); - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/LocalCache.java b/xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/LocalCache.java deleted file mode 100644 index 85dc249a..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/glue/cache/LocalCache.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.xxl.job.core.glue.cache; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * local interface - * @author Administrator - */ -public class LocalCache implements ICache{ - - private static final LocalCache instance = new LocalCache(); - public static LocalCache getInstance(){ - return instance; - } - - private static final ConcurrentHashMap cacheMap = new ConcurrentHashMap(); - private static final long CACHE_TIMEOUT = 5000; - - private static String makeTimKey(String key){ - return key.concat("_tim"); - } - private static String makeDataKey(String key){ - return key.concat("_data"); - } - - @Override - public boolean set(String key, Object value) { - cacheMap.put(makeTimKey(key), System.currentTimeMillis() + CACHE_TIMEOUT); - cacheMap.put(makeDataKey(key), value); - return true; - } - - @Override - public boolean set(String key, Object value, long timeout) { - cacheMap.put(makeTimKey(key), System.currentTimeMillis() + timeout); - cacheMap.put(makeDataKey(key), value); - return true; - } - - @Override - public Object get(String key) { - Object tim = cacheMap.get(makeTimKey(key)); - if (tim != null && System.currentTimeMillis() < Long.parseLong(tim.toString())) { - return cacheMap.get(makeDataKey(key)); - } - return null; - } - - @Override - public boolean remove(String key) { - cacheMap.remove(makeTimKey(key)); - cacheMap.remove(makeDataKey(key)); - return true; - } - - public static void main(String[] args) { - String key = "key01"; - System.out.println(LocalCache.getInstance().get(key)); - - LocalCache.getInstance().set(key, "v1"); - System.out.println(LocalCache.getInstance().get(key)); - - LocalCache.getInstance().set(key, "v2"); - System.out.println(LocalCache.getInstance().get(key)); - - LocalCache.getInstance().remove(key); - System.out.println(LocalCache.getInstance().get(key)); - - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java index d7a84a94..8ea891e5 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/GlueJobHandler.java @@ -1,22 +1,30 @@ package com.xxl.job.core.handler.impl; -import com.xxl.job.core.glue.GlueFactory; import com.xxl.job.core.handler.IJobHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * glue job handler * @author xuxueli 2016-5-19 21:05:45 */ public class GlueJobHandler extends IJobHandler { - - private int jobId; - public GlueJobHandler(int jobId) { - this.jobId = jobId; + private static Logger logger = LoggerFactory.getLogger(GlueJobHandler.class); + + private long glueUpdatetime; + private IJobHandler jobHandler; + public GlueJobHandler(IJobHandler jobHandler, long glueUpdatetime) { + this.jobHandler = jobHandler; + this.glueUpdatetime = glueUpdatetime; + } + public long getGlueUpdatetime() { + return glueUpdatetime; } @Override public void execute(String... params) throws Exception { - GlueFactory.glue(jobId, params); + logger.info("----------- glue.version:{} -----------", glueUpdatetime); + jobHandler.execute(params); } } diff --git a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java index 98f9d881..8d1924b4 100644 --- a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java +++ b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java @@ -28,7 +28,7 @@ public class DemoJobHandler extends IJobHandler { public void execute(String... params) throws Exception { logger.info("XXL-JOB, Hello World."); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 5; i++) { logger.info("beat at:{}", i); TimeUnit.SECONDS.sleep(2); } diff --git a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml index 3c2beee0..113de5fe 100644 --- a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml +++ b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml @@ -42,8 +42,6 @@ - - diff --git a/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties b/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties index 35060139..d0b1d666 100644 --- a/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties +++ b/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties @@ -7,7 +7,4 @@ xxl.job.db.password=root_pwd ### xxl-job executor address xxl.job.executor.appname=xxl-job-executor-example xxl.job.executor.ip= -xxl.job.executor.port=9999 - -### xxl-job glue cache time/ms -xxl.job.glue.cache.time=10000 +xxl.job.executor.port=9999 \ No newline at end of file