From 0c03f6348c166491f1c42df5b6963ee8bbb1d1f7 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Tue, 25 Jul 2017 21:56:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=BF=E6=92=AD=E5=88=86=E7=89=87=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 3 +- .../core/route/ExecutorRouteStrategyEnum.java | 3 +- .../job/admin/core/route/ExecutorRouter.java | 3 +- .../job/admin/core/trigger/XxlJobTrigger.java | 84 +++++++++++++++++-- .../job/core/biz/impl/ExecutorBizImpl.java | 9 +- .../xxl/job/core/biz/model/TriggerParam.java | 46 +++++++--- .../xxl/job/core/executor/XxlJobExecutor.java | 3 + .../xxl/job/core/log/XxlJobFileAppender.java | 4 +- .../com/xxl/job/core/thread/JobThread.java | 2 + .../com/xxl/job/core/util/ShardingUtil.java | 46 ++++++++++ .../service/jobhandler/DemoJobHandler.java | 10 +-- .../jobhandler/ShardingJobHandler.java | 39 +++++++++ .../jobhandler/ShardingJobHandler.java | 39 +++++++++ 13 files changed, 260 insertions(+), 31 deletions(-) create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java create mode 100644 xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java create mode 100644 xxl-job-executor-springboot-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 097367cd..ddde89ee 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -306,7 +306,7 @@ XXL-JOB是一个轻量级分布式任务调度框架,其核心设计目标是 #### 步骤一:执行器项目中,开发JobHandler: - 1、 新建一个继承com.xxl.job.core.handler.IJobHandler的Java类; - - 2、 该类被Spring容器扫描为Bean实例,如加“@Service注解”; + - 2、 该类被Spring容器扫描为Bean实例,如加“@Component”注解; - 3、 添加 “@JobHander(value="自定义jobhandler名称")”注解,注解的value值为自定义的JobHandler名称,该名称对应的是调度中心新建任务的JobHandler属性的值。 (可参考xxl-job-executor-example项目中的DemoJobHandler,见下图) @@ -889,6 +889,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 #### 6.18 版本 V1.8.1 特性[快照版本] - 1、任务分片:一个任务被拆分成N个独立的任务单元,然后由分布式部署的执行器分别执行某一个或几个分片单元; +- 2、执行器JobHandler禁止命名冲突; #### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java index 0778cdc2..626af4e7 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java @@ -15,7 +15,8 @@ public enum ExecutorRouteStrategyEnum { LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()), LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()), FAILOVER("故障转移", new ExecutorRouteFailover()), - BUSYOVER("忙碌转移", new ExecutorRouteBusyover()); + BUSYOVER("忙碌转移", new ExecutorRouteBusyover()), + BROADCAST("广播", null); ExecutorRouteStrategyEnum(String title, ExecutorRouter router) { this.title = title; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java index 9d70625d..d762de3e 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java @@ -28,7 +28,7 @@ public abstract class ExecutorRouter { * run executor * @param triggerParam * @param address - * @return + * @return ReturnT.content: final address */ public static ReturnT runExecutor(TriggerParam triggerParam, String address){ ReturnT runResult = null; @@ -46,6 +46,7 @@ public abstract class ExecutorRouter { runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); + runResult.setContent(address); return runResult; } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index f7cbf3a6..aeacddfa 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; +import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.thread.JobFailMonitorHelper; import com.xxl.job.core.biz.model.ReturnT; @@ -40,6 +41,79 @@ public class XxlJobTrigger { ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy ArrayList addressList = (ArrayList) group.getRegistryList(); + // broadcast + if (ExecutorRouteStrategyEnum.BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) { + for (int i = 0; i < addressList.size(); i++) { + String address = addressList.get(i); + + // 1、save log-id + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setJobGroup(jobInfo.getJobGroup()); + jobLog.setJobId(jobInfo.getId()); + XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); + logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); + + // 2、prepare trigger-info + //jobLog.setExecutorAddress(executorAddress); + jobLog.setGlueType(jobInfo.getGlueType()); + jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); + jobLog.setExecutorParam(jobInfo.getExecutorParam()); + jobLog.setTriggerTime(new Date()); + + ReturnT triggerResult = new ReturnT(null); + StringBuffer triggerMsgSb = new StringBuffer(); + triggerMsgSb.append("注册方式:").append( (group.getAddressType() == 0)?"自动注册":"手动录入" ); + triggerMsgSb.append("
阻塞处理策略:").append(blockStrategy.getTitle()); + triggerMsgSb.append("
失败处理策略:").append(failStrategy.getTitle()); + triggerMsgSb.append("
地址列表:").append(group.getRegistryList()); + triggerMsgSb.append("
路由策略:").append(executorRouteStrategyEnum.getTitle()).append("("+i+"/"+addressList.size()+")"); // update01 + + // 3、trigger-valid + if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) { + triggerResult.setCode(ReturnT.FAIL_CODE); + triggerMsgSb.append("
----------------------
").append("调度失败:").append("执行器地址为空"); + } + + if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) { + // 4.1、trigger-param + TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobId(jobInfo.getId()); + triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); + triggerParam.setExecutorParams(jobInfo.getExecutorParam()); + triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); + triggerParam.setLogId(jobLog.getId()); + triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); + triggerParam.setGlueType(jobInfo.getGlueType()); + triggerParam.setGlueSource(jobInfo.getGlueSource()); + triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); + triggerParam.setBroadcastIndex(i); + triggerParam.setBroadcastTotal(addressList.size()); // update02 + + // 4.2、trigger-run (route run / trigger remote executor) + triggerResult = ExecutorRouter.runExecutor(triggerParam, address); // update03 + triggerMsgSb.append("

>>>>>>>>>>>触发调度<<<<<<<<<<<
").append(triggerResult.getMsg()); + + // 4.3、trigger (fail retry) + if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) { + triggerResult = ExecutorRouter.runExecutor(triggerParam, address); // update04 + triggerMsgSb.append("

>>>>>>>>>>>失败重试<<<<<<<<<<<
").append(triggerResult.getMsg()); + } + } + + // 5、save trigger-info + jobLog.setExecutorAddress(triggerResult.getContent()); + jobLog.setTriggerCode(triggerResult.getCode()); + jobLog.setTriggerMsg(triggerMsgSb.toString()); + XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); + + // 6、monitor triger + JobFailMonitorHelper.monitor(jobLog.getId()); + logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); + + } + return; + } + // 1、save log-id XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); @@ -67,10 +141,6 @@ public class XxlJobTrigger { triggerResult.setCode(ReturnT.FAIL_CODE); triggerMsgSb.append("
----------------------
").append("调度失败:").append("执行器地址为空"); } - if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && executorRouteStrategyEnum == null) { - triggerResult.setCode(ReturnT.FAIL_CODE); - triggerMsgSb.append("
----------------------
").append("调度失败:").append("执行器路由策略为空"); - } if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) { // 4.1、trigger-param @@ -79,11 +149,13 @@ public class XxlJobTrigger { triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); + triggerParam.setLogId(jobLog.getId()); + triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); - triggerParam.setLogId(jobLog.getId()); - triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); + triggerParam.setBroadcastIndex(0); + triggerParam.setBroadcastTotal(1); // 4.2、trigger-run (route run / trigger remote executor) triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 57547676..c7189e43 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -76,10 +76,13 @@ public class ExecutorBizImpl implements ExecutorBiz { // valid:jobHandler + jobThread if (GlueTypeEnum.BEAN==GlueTypeEnum.match(triggerParam.getGlueType())) { + // new jobhandler + IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); + // valid old jobThread - if (jobThread != null && jobHandler!=null && jobThread.getHandler() != jobHandler) { + if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread - removeOldReason = "更新JobHandler或更换任务模式,终止旧任务线程"; + removeOldReason = "更换JobHandler或更换任务模式,终止旧任务线程"; jobThread = null; jobHandler = null; @@ -87,7 +90,7 @@ public class ExecutorBizImpl implements ExecutorBiz { // valid handler if (jobHandler == null) { - jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); + jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java index 7a408ffd..448f03bd 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -14,12 +14,15 @@ public class TriggerParam implements Serializable{ private String executorParams; private String executorBlockStrategy; + private int logId; + private long logDateTim; + private String glueType; private String glueSource; private long glueUpdatetime; - private int logId; - private long logDateTim; + private int broadcastIndex; + private int broadcastTotal; public int getJobId() { return jobId; @@ -53,6 +56,22 @@ public class TriggerParam implements Serializable{ this.executorBlockStrategy = executorBlockStrategy; } + public int getLogId() { + return logId; + } + + public void setLogId(int logId) { + this.logId = logId; + } + + public long getLogDateTim() { + return logDateTim; + } + + public void setLogDateTim(long logDateTim) { + this.logDateTim = logDateTim; + } + public String getGlueType() { return glueType; } @@ -77,20 +96,20 @@ public class TriggerParam implements Serializable{ this.glueUpdatetime = glueUpdatetime; } - public int getLogId() { - return logId; + public int getBroadcastIndex() { + return broadcastIndex; } - public void setLogId(int logId) { - this.logId = logId; + public void setBroadcastIndex(int broadcastIndex) { + this.broadcastIndex = broadcastIndex; } - public long getLogDateTim() { - return logDateTim; + public int getBroadcastTotal() { + return broadcastTotal; } - public void setLogDateTim(long logDateTim) { - this.logDateTim = logDateTim; + public void setBroadcastTotal(int broadcastTotal) { + this.broadcastTotal = broadcastTotal; } @Override @@ -100,11 +119,14 @@ public class TriggerParam implements Serializable{ ", executorHandler='" + executorHandler + '\'' + ", executorParams='" + executorParams + '\'' + ", executorBlockStrategy='" + executorBlockStrategy + '\'' + + ", logId=" + logId + + ", logDateTim=" + logDateTim + ", glueType='" + glueType + '\'' + ", glueSource='" + glueSource + '\'' + ", glueUpdatetime=" + glueUpdatetime + - ", logId=" + logId + - ", logDateTim=" + logDateTim + + ", broadcastIndex=" + broadcastIndex + + ", broadcastTotal=" + broadcastTotal + '}'; } + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index cb5e993a..f399ed67 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -98,6 +98,9 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe if (serviceBean instanceof IJobHandler){ String name = serviceBean.getClass().getAnnotation(JobHander.class).value(); IJobHandler handler = (IJobHandler) serviceBean; + if (loadJobHandler(name) != null) { + throw new RuntimeException("xxl-job jobhandler naming conflicts."); + } registJobHandler(name, handler); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index 1315be06..de429c38 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -18,8 +18,8 @@ public class XxlJobFileAppender { // for JobThread (support log for child thread of job handler) //public static ThreadLocal contextHolder = new ThreadLocal(); - public static InheritableThreadLocal contextHolder = new InheritableThreadLocal(); - public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + public static final InheritableThreadLocal contextHolder = new InheritableThreadLocal(); + public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); /** * log filename: yyyy-MM-dd/9999.log diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index fb4d7b76..37c840b1 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -7,6 +7,7 @@ import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.ShardingUtil; import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +113,7 @@ public class JobThread extends Thread{ String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); + ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); executeResult = handler.execute(handlerParams); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java new file mode 100644 index 00000000..cfd35b93 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java @@ -0,0 +1,46 @@ +package com.xxl.job.core.util; + +/** + * sharding vo + * @author xuxueli 2017-07-25 21:26:38 + */ +public class ShardingUtil { + + private static InheritableThreadLocal contextHolder = new InheritableThreadLocal(); + + public static class ShardingVO { + + private int index; // sharding index + private int total; // sharding total + + public ShardingVO(int index, int total) { + this.index = index; + this.total = total; + } + + public int getIndex() { + return index; + } + + public void setIndex(int index) { + this.index = index; + } + + public int getTotal() { + return total; + } + + public void setTotal(int total) { + this.total = total; + } + } + + public static void setShardingVo(ShardingVO shardingVo){ + contextHolder.set(shardingVo); + } + + public static ShardingVO getShardingVo(){ + return contextHolder.get(); + } + +} diff --git a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java index 4fe6c8be..f9a2f605 100644 --- a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java +++ b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/DemoJobHandler.java @@ -4,7 +4,7 @@ import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; import com.xxl.job.core.log.XxlJobLogger; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @@ -13,15 +13,15 @@ import java.util.concurrent.TimeUnit; * 任务Handler的一个Demo(Bean模式) * * 开发步骤: - * 1、继承 “IJobHandler” ; - * 2、装配到Spring,例如加 “@Service” 注解; - * 3、加 “@JobHander” 注解,注解value值为新增任务生成的JobKey的值;多个JobKey用逗号分割; + * 1、新建一个继承com.xxl.job.core.handler.IJobHandler的Java类; + * 2、该类被Spring容器扫描为Bean实例,如加“@Component”注解; + * 3、添加 “@JobHander(value="自定义jobhandler名称")”注解,注解的value值为自定义的JobHandler名称,该名称对应的是调度中心新建任务的JobHandler属性的值。 * 4、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志; * * @author xuxueli 2015-12-19 19:43:36 */ @JobHander(value="demoJobHandler") -@Service +@Component public class DemoJobHandler extends IJobHandler { @Override diff --git a/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java new file mode 100644 index 00000000..8f2b15e4 --- /dev/null +++ b/xxl-job-executor-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java @@ -0,0 +1,39 @@ +package com.xxl.job.executor.service.jobhandler; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.JobHander; +import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.ShardingUtil; +import org.springframework.stereotype.Service; + + +/** + * 广播分片任务 + * + * @author xuxueli 2017-07-25 20:56:50 + */ +@JobHander(value="shardingJobHandler") +@Service +public class ShardingJobHandler extends IJobHandler { + + @Override + public ReturnT execute(String... params) throws Exception { + + // 分片参数 + ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); + XxlJobLogger.log("分片参数:当前分片序号 = {0}, 总分片数 = {1}", shardingVO.getIndex(), shardingVO.getTotal()); + + // 业务逻辑 + for (int i = 0; i < shardingVO.getTotal(); i++) { + if (i == shardingVO.getIndex()) { + XxlJobLogger.log("第 {0} 片, 命中分片开始处理", i); + } else { + XxlJobLogger.log("第 {0} 片, 忽略", i); + } + } + + return ReturnT.SUCCESS; + } + +} diff --git a/xxl-job-executor-springboot-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java b/xxl-job-executor-springboot-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java new file mode 100644 index 00000000..91166eb2 --- /dev/null +++ b/xxl-job-executor-springboot-example/src/main/java/com/xxl/job/executor/service/jobhandler/ShardingJobHandler.java @@ -0,0 +1,39 @@ +package com.xxl.job.executor.service.jobhandler; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.JobHander; +import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.ShardingUtil; +import org.springframework.stereotype.Service; + + +/** + * 广播分片任务 + * + * @author xuxueli 2017-07-25 20:56:50 + */ +@JobHander(value="shardingJobHandler") +@Service +public class ShardingJobHandler extends IJobHandler { + + @Override + public ReturnT execute(String... params) throws Exception { + + // 分片参数 + ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); + XxlJobLogger.log("分片参数:当前分片序号 = {0}, 总分片数 = {1}", shardingVO.getIndex(), shardingVO.getTotal()); + + // 业务逻辑 + for (int i = 0; i < shardingVO.getTotal(); i++) { + if (i == shardingVO.getIndex()) { + XxlJobLogger.log("第 {0} 片, 命中分片开始处理", i); + } else { + XxlJobLogger.log("第 {0} 片, 忽略", i); + } + } + + return ReturnT.SUCCESS; + } + +}