From 54fce811ebd1d1d4dd179db933cc4f18ce80c787 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Sun, 23 Sep 2018 02:51:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E7=89=87=E4=BB=BB=E5=8A=A1=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E9=87=8D=E8=AF=95=E4=BC=98=E5=8C=96=EF=BC=8C=E4=BB=85?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E5=BD=93=E5=89=8D=E5=A4=B1=E8=B4=A5=E7=9A=84?= =?UTF-8?q?=E5=88=86=E7=89=87=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 3 +- doc/db/tables_xxl_job.sql | 1 + .../admin/controller/JobInfoController.java | 2 +- .../admin/core/jobbean/RemoteHttpJobBean.java | 2 +- .../xxl/job/admin/core/model/XxlJobLog.java | 9 +++++ .../core/thread/JobFailMonitorHelper.java | 5 +-- .../core/thread/JobTriggerPoolHelper.java | 8 ++-- .../job/admin/core/trigger/XxlJobTrigger.java | 38 ++++++++++++------- .../job/admin/service/impl/AdminBizImpl.java | 2 +- .../mybatis-mapper/XxlJobLogMapper.xml | 3 ++ 10 files changed, 48 insertions(+), 25 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index a53159ed..506a3816 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1282,7 +1282,8 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 32、底层RPC序列化协议调整为hessian2; - 33、修复表字段 “t.order”与数据库关键字冲突查询失败的问题, - 34、调度中心提供API服务,支持通过API服务对任务进行查询、新增、更新、启停等操作; -- 35、【迭代中】分片任务失败重试优化,仅重试当前失败的分片; +- 35、分片任务失败重试优化,仅重试当前失败的分片; +- 36、【迭代中】任务参数数据框调整,手动触发时支持动态输入参数; ### TODO LIST diff --git a/doc/db/tables_xxl_job.sql b/doc/db/tables_xxl_job.sql index a07a1b8f..fc3d7cb0 100644 --- a/doc/db/tables_xxl_job.sql +++ b/doc/db/tables_xxl_job.sql @@ -179,6 +179,7 @@ CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` ( `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址', `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数', + `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数', `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间', `trigger_code` int(11) NOT NULL COMMENT '调度-结果', diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java index 0d3effa6..48bbd585 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java @@ -92,7 +92,7 @@ public class JobInfoController { @ResponseBody //@PermessionLimit(limit = false) public ReturnT triggerJob(int id) { - JobTriggerPoolHelper.trigger(id, -1, TriggerTypeEnum.MANUAL); + JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null); return ReturnT.SUCCESS; } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index f2ccf74a..84c59e42 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -29,7 +29,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { // trigger //XxlJobTrigger.trigger(jobId); - JobTriggerPoolHelper.trigger(jobId, -1, TriggerTypeEnum.CRON); + JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null); } } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java index 33340e37..bdecc1c2 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java @@ -18,6 +18,7 @@ public class XxlJobLog { private String executorAddress; private String executorHandler; private String executorParam; + private String executorShardingParam; private int executorFailRetryCount; // trigger info @@ -78,6 +79,14 @@ public class XxlJobLog { this.executorParam = executorParam; } + public String getExecutorShardingParam() { + return executorShardingParam; + } + + public void setExecutorShardingParam(String executorShardingParam) { + this.executorShardingParam = executorShardingParam; + } + public int getExecutorFailRetryCount() { return executorFailRetryCount; } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 6dc3f27f..849fd533 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -73,10 +73,7 @@ public class JobFailMonitorHelper { XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId()); if (log.getExecutorFailRetryCount() > 0) { - - // TODO,分片任务失败重试优化,仅重试失败分片 - - JobTriggerPoolHelper.trigger(log.getJobId(), (log.getExecutorFailRetryCount()-1), TriggerTypeEnum.RETRY); + JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam()); String retryMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<<
"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(log); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index a90c6bc2..298ecaaa 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -29,11 +29,11 @@ public class JobTriggerPoolHelper { new ThreadPoolExecutor.CallerRunsPolicy()); - public void addTrigger(final int jobId, final int failRetryCount, final TriggerTypeEnum triggerType) { + public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam) { triggerPool.execute(new Runnable() { @Override public void run() { - XxlJobTrigger.trigger(jobId, failRetryCount, triggerType); + XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam); } }); } @@ -55,8 +55,8 @@ public class JobTriggerPoolHelper { * <0: use param from job info config * */ - public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) { - helper.addTrigger(jobId, failRetryCount, triggerType); + public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam) { + helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam); } public static void toStop() { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index d90602bc..2478ad6a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -13,12 +13,11 @@ import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.util.IpUtil; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Date; -import java.util.List; /** * xxl-job trigger @@ -36,27 +35,38 @@ public class XxlJobTrigger { * <0: use param from job info config * */ - public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) { + public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam) { // load data - XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info + XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); - XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info + XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // process trigger - if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && CollectionUtils.isNotEmpty(group.getRegistryList())) { - for (int i = 0; i < group.getRegistryList().size(); i++) { - processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i); + if (triggerType==TriggerTypeEnum.RETRY && executorShardingParam!=null) { + String[] shardingArr = executorShardingParam.split("/"); + if (shardingArr.length==2 && StringUtils.isNumeric(shardingArr[0]) && StringUtils.isNumeric(shardingArr[1])); { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, Integer.valueOf(shardingArr[0]), Integer.valueOf(shardingArr[1])); } } else { - processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0); + if (CollectionUtils.isNotEmpty(group.getRegistryList())) { + if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) { + for (int i = 0; i < group.getRegistryList().size(); i++) { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); + } + } else { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0, 1); + } + } else { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0, 0); + } } } - private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index){ + private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy @@ -83,11 +93,12 @@ public class XxlJobTrigger { triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); - triggerParam.setBroadcastTotal(group.getRegistryList()!=null?group.getRegistryList().size():0); + triggerParam.setBroadcastTotal(total); // 3、init address String address = null; ReturnT routeAddressResult = null; + String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum &&total>0)?String.valueOf(triggerParam.getBroadcastIndex()).concat("/").concat(String.valueOf(triggerParam.getBroadcastTotal())):null; if (CollectionUtils.isNotEmpty(group.getRegistryList())) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { address = group.getRegistryList().get(index); @@ -117,8 +128,8 @@ public class XxlJobTrigger { .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); - if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) { - triggerMsgSb.append("("+index+"/"+(group.getRegistryList()!=null?group.getRegistryList().size():0)+")"); + if (shardingParam != null) { + triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); @@ -131,6 +142,7 @@ public class XxlJobTrigger { jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); + jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java index 553acaa3..73544de0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -71,7 +71,7 @@ public class AdminBizImpl implements AdminBiz { int childJobId = (StringUtils.isNotBlank(childJobIds[i]) && StringUtils.isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; if (childJobId > 0) { - JobTriggerPoolHelper.trigger(childJobId, 0, TriggerTypeEnum.PARENT); + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, 0, null); ReturnT triggerChildResult = ReturnT.SUCCESS; // add msg diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml index 66c2c59c..d51cbf8d 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml @@ -12,6 +12,7 @@ + @@ -31,6 +32,7 @@ t.executor_address, t.executor_handler, t.executor_param, + t.executor_sharding_param, t.executor_fail_retry_count, t.trigger_time, t.trigger_code, @@ -141,6 +143,7 @@ `executor_address`= #{executorAddress}, `executor_handler`=#{executorHandler}, `executor_param`= #{executorParam}, + `executor_sharding_param`= #{executorShardingParam}, `executor_fail_retry_count`= #{executorFailRetryCount} WHERE `id`= #{id}