From 00535b652466df96df947710024744fc49ed1c9d Mon Sep 17 00:00:00 2001 From: sikadai Date: Sun, 8 May 2022 22:13:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=88=86=E7=89=87=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E5=A4=9A=E7=BA=BF=E7=A8=8B=E7=9A=84=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xxl/job/admin/core/model/XxlJobInfo.java | 10 +++- .../core/route/ExecutorRouteStrategyEnum.java | 2 +- .../route/strategy/ExecutorRouteSharding.java | 23 ++++++++ .../job/admin/core/trigger/XxlJobTrigger.java | 53 ++++++++++++++----- .../admin/service/impl/XxlJobServiceImpl.java | 1 + .../src/main/resources/application.properties | 2 +- .../main/resources/i18n/message_en.properties | 2 + .../resources/i18n/message_zh_CN.properties | 2 + .../resources/i18n/message_zh_TC.properties | 2 + .../mybatis-mapper/XxlJobInfoMapper.xml | 5 ++ .../resources/static/js/jobinfo.index.1.js | 5 ++ .../templates/jobinfo/jobinfo.index.ftl | 18 +++++++ .../xxl/job/core/biz/model/TriggerParam.java | 9 ++++ .../xxl/job/core/context/XxlJobContext.java | 19 +++++++ .../xxl/job/core/context/XxlJobHelper.java | 14 +++++ .../core/enums/ExecutorBlockStrategyEnum.java | 2 +- .../com/xxl/job/core/thread/JobThread.java | 21 +++++++- .../core/thread/TriggerCallbackThread.java | 1 + .../service/jobhandler/SampleXxlJob.java | 9 +++- 19 files changed, 178 insertions(+), 22 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java index e47b6dc6..8be56ad0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java @@ -27,6 +27,7 @@ public class XxlJobInfo { private String executorRouteStrategy; // 执行器路由策略 private String executorHandler; // 执行器,任务Handler名称 private String executorParam; // 执行器,任务参数 + private String shardingParam; // 执行器,分片参数 private String executorBlockStrategy; // 阻塞处理策略 private int executorTimeout; // 任务执行超时时间,单位秒 private int executorFailRetryCount; // 失败重试次数 @@ -42,7 +43,6 @@ public class XxlJobInfo { private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 - public int getId() { return id; } @@ -147,6 +147,14 @@ public class XxlJobInfo { this.executorParam = executorParam; } + public String getShardingParam() { + return shardingParam; + } + + public void setShardingParam(String shardingParam) { + this.shardingParam = shardingParam; + } + public String getExecutorBlockStrategy() { return executorBlockStrategy; } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java index 7fff93a9..f101d65c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java @@ -17,7 +17,7 @@ public enum ExecutorRouteStrategyEnum { LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()), FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()), BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()), - SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null); + SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), new ExecutorRouteSharding()); ExecutorRouteStrategyEnum(String title, ExecutorRouter router) { this.title = title; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java new file mode 100644 index 00000000..6becbfc6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java @@ -0,0 +1,23 @@ +package com.xxl.job.admin.core.route.strategy; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; + +import java.util.List; + +/** + *

+ * 分片规则的-路由器-直接采取的是随机路由算法 + *

+ * + * @author daiqi + * @since 2022/5/8 18:23 + */ +public class ExecutorRouteSharding extends ExecutorRouteBusyover { + + @Override + public ReturnT route(TriggerParam triggerParam, List addressList) { + return super.route(triggerParam, addressList); + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 748befc6..65aedd66 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -13,10 +13,12 @@ import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.util.IpUtil; import com.xxl.job.core.util.ThrowableUtil; +import io.netty.util.internal.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; +import java.util.List; /** * xxl-job trigger @@ -79,8 +81,9 @@ public class XxlJobTrigger { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { - for (int i = 0; i < group.getRegistryList().size(); i++) { - processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); + int shardingTotal = getSharingTotal(jobInfo.getShardingParam()); + for (int i = 0; i < shardingTotal; i++) { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, shardingTotal); } } else { if (shardingParam == null) { @@ -91,6 +94,35 @@ public class XxlJobTrigger { } + private static int getSharingTotal(String executorShardingParam) { + if (StringUtil.isNullOrEmpty(executorShardingParam)) { + return 1; + } + // 获取分片的数量 + return getShardingArr(executorShardingParam).length; + } + + public static String getShardingParam(String shardingParams, int index) { + String[] shardingArr = getShardingArr(shardingParams); + String[] shardingParamArr = shardingArr[index].split("="); + if (shardingParamArr.length >= 2) { + return shardingParamArr[1]; + } + return shardingParamArr[0]; + } + + private static String [] getShardingArr(String shardingParams) { + if (StringUtil.isNullOrEmpty(shardingParams)) { + return new String[0]; + } + return shardingParams.split("/"); + } + + public static class ShardingInfo { + private int total; + private List shardingParams; + } + private static boolean isNumeric(String str){ try { int result = Integer.valueOf(str); @@ -113,7 +145,7 @@ public class XxlJobTrigger { // param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy - String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; + String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?getShardingParam(jobInfo.getShardingParam(), index):null; // 1、save log-id XxlJobLog jobLog = new XxlJobLog(); @@ -128,6 +160,7 @@ public class XxlJobTrigger { triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); + triggerParam.setShardingParam(shardingParam); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); @@ -142,17 +175,9 @@ public class XxlJobTrigger { String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { - if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { - if (index < group.getRegistryList().size()) { - address = group.getRegistryList().get(index); - } else { - address = group.getRegistryList().get(0); - } - } else { - routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); - if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { - address = routeAddressResult.getContent(); - } + routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); + if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { + address = routeAddressResult.getContent(); } } else { routeAddressResult = new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 530ee41c..f7d1fb0a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -274,6 +274,7 @@ public class XxlJobServiceImpl implements XxlJobService { exists_jobInfo.setExecutorRouteStrategy(jobInfo.getExecutorRouteStrategy()); exists_jobInfo.setExecutorHandler(jobInfo.getExecutorHandler()); exists_jobInfo.setExecutorParam(jobInfo.getExecutorParam()); + exists_jobInfo.setShardingParam(jobInfo.getShardingParam()); exists_jobInfo.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); exists_jobInfo.setExecutorTimeout(jobInfo.getExecutorTimeout()); exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount()); diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index afe93b42..1789e6c8 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -25,7 +25,7 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml ### xxl-job, datasource spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=root_pwd +spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool diff --git a/xxl-job-admin/src/main/resources/i18n/message_en.properties b/xxl-job-admin/src/main/resources/i18n/message_en.properties index 7d67636d..1408355a 100644 --- a/xxl-job-admin/src/main/resources/i18n/message_en.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_en.properties @@ -117,6 +117,7 @@ jobinfo_field_jobdesc=Job description jobinfo_field_timeout=Job timeout period jobinfo_field_gluetype=GLUE Type jobinfo_field_executorparam=Param +jobinfo_field_shardingparam=Sharding Param jobinfo_field_author=Author jobinfo_field_alarmemail=Alarm email jobinfo_field_alarmemail_placeholder=Please enter alarm mail, if there are more than one comma separated @@ -221,6 +222,7 @@ jobgroup_empty=There is no valid executor. Please contact the administrator ## job conf jobconf_block_SERIAL_EXECUTION=Serial execution +jobconf_block_CONCURRENT_EXECUTION=Concurrent execution jobconf_block_DISCARD_LATER=Discard Later jobconf_block_COVER_EARLY=Cover Early jobconf_route_first=First diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties index 4897a238..e8edbdcc 100644 --- a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties @@ -116,6 +116,7 @@ jobinfo_field_jobgroup=执行器 jobinfo_field_jobdesc=任务描述 jobinfo_field_gluetype=运行模式 jobinfo_field_executorparam=任务参数 +jobinfo_field_shardingparam=任务分片参数 jobinfo_field_author=负责人 jobinfo_field_timeout=任务超时时间 jobinfo_field_alarmemail=报警邮件 @@ -221,6 +222,7 @@ jobgroup_empty=不存在有效执行器,请联系管理员 ## job conf jobconf_block_SERIAL_EXECUTION=单机串行 +jobconf_block_CONCURRENT_EXECUTION=单机并行 jobconf_block_DISCARD_LATER=丢弃后续调度 jobconf_block_COVER_EARLY=覆盖之前调度 jobconf_route_first=第一个 diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties index 57d28e4f..f98eb649 100755 --- a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties @@ -116,6 +116,7 @@ jobinfo_field_jobgroup=執行器 jobinfo_field_jobdesc=任務描述 jobinfo_field_gluetype=運行模式 jobinfo_field_executorparam=任務參數 +jobinfo_field_shardingparam=分片參數 jobinfo_field_author=負責人 jobinfo_field_timeout=任務超時秒數 jobinfo_field_alarmemail=告警郵件 @@ -221,6 +222,7 @@ jobgroup_empty=不存在有效執行器,請聯絡系統管理員 ## job conf jobconf_block_SERIAL_EXECUTION=單機串行 +jobconf_block_CONCURRENT_EXECUTION=单机并行 jobconf_block_DISCARD_LATER=丢棄后續調度 jobconf_block_COVER_EARLY=覆蓋之前調度 jobconf_route_first=第一個 diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml index 7b3c3a3e..6968d250 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml @@ -22,6 +22,7 @@ + @@ -52,6 +53,7 @@ t.executor_route_strategy, t.executor_handler, t.executor_param, + t.sharding_param, t.executor_block_strategy, t.executor_timeout, t.executor_fail_retry_count, @@ -125,6 +127,7 @@ executor_route_strategy, executor_handler, executor_param, + sharding_param, executor_block_strategy, executor_timeout, executor_fail_retry_count, @@ -149,6 +152,7 @@ #{executorRouteStrategy}, #{executorHandler}, #{executorParam}, + #{shardingParam}, #{executorBlockStrategy}, #{executorTimeout}, #{executorFailRetryCount}, @@ -187,6 +191,7 @@ executor_route_strategy = #{executorRouteStrategy}, executor_handler = #{executorHandler}, executor_param = #{executorParam}, + sharding_param = #{shardingParam}, executor_block_strategy = #{executorBlockStrategy}, executor_timeout = ${executorTimeout}, executor_fail_retry_count = ${executorFailRetryCount}, diff --git a/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js b/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js index b479e972..8a472ec6 100644 --- a/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js +++ b/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js @@ -74,6 +74,7 @@ $(function() { } }, { "data": 'executorParam', "visible" : false}, + { "data": 'shardingParam', "visible" : false}, { "data": 'addTime', "visible" : false, @@ -266,6 +267,7 @@ $(function() { $("#jobTriggerModal .form input[name='id']").val( row.id ); $("#jobTriggerModal .form textarea[name='executorParam']").val( row.executorParam ); + $("#jobTriggerModal .form textarea[name='shardingParam']").val( row.shardingParam ); $('#jobTriggerModal').modal({backdrop: false, keyboard: false}).modal('show'); }); @@ -276,6 +278,7 @@ $(function() { data : { "id" : $("#jobTriggerModal .form input[name='id']").val(), "executorParam" : $("#jobTriggerModal .textarea[name='executorParam']").val(), + "shardingParam" : $("#jobTriggerModal .textarea[name='shardingParam']").val(), "addressList" : $("#jobTriggerModal .textarea[name='addressList']").val() }, dataType : "json", @@ -561,6 +564,7 @@ $(function() { $('#updateModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true); $("#updateModal .form input[name='executorHandler']").val( row.executorHandler ); $("#updateModal .form textarea[name='executorParam']").val( row.executorParam ); + $("#updateModal .form textarea[name='shardingParam']").val( row.shardingParam ); // 》init glueType $("#updateModal .form select[name=glueType]").change(); @@ -716,6 +720,7 @@ $(function() { $('#addModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true); $("#addModal .form input[name='executorHandler']").val( row.executorHandler ); $("#addModal .form textarea[name='executorParam']").val( row.executorParam ); + $("#addModal .form textarea[name='shardingParam']").val( row.shardingParam ); // 》init glueType $("#addModal .form select[name=glueType]").change(); diff --git a/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl b/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl index 3a5d7d8a..4b715352 100644 --- a/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl +++ b/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl @@ -83,6 +83,7 @@ ${I18n.schedule_type} ${I18n.jobinfo_field_gluetype} ${I18n.jobinfo_field_executorparam} + ${I18n.jobinfo_field_shardingparam} addTime updateTime ${I18n.jobinfo_field_author} @@ -188,6 +189,13 @@ +
+ +
+ +
+
+

${I18n.jobinfo_conf_advanced}

<#-- 高级配置 --> @@ -265,6 +273,7 @@ echo "${I18n.jobinfo_script_location}:$0" echo "${I18n.jobinfo_field_executorparam}:$1" echo "${I18n.jobinfo_shard_index} = $2" echo "${I18n.jobinfo_shard_total} = $3" +echo "${I18n.jobinfo_field_shardingparam}:$4" <#--echo "参数数量:$#" for param in $* do @@ -287,6 +296,7 @@ print "${I18n.jobinfo_script_location}:", sys.argv[0] print "${I18n.jobinfo_field_executorparam}:", sys.argv[1] print "${I18n.jobinfo_shard_index}:", sys.argv[2] print "${I18n.jobinfo_shard_total}:", sys.argv[3] +print "${I18n.jobinfo_field_shardingparam}:", sys.argv[4] <#--for i in range(1, len(sys.argv)): time.sleep(1) print "参数", i, sys.argv[i]--> @@ -309,6 +319,7 @@ logging.info("脚本文件:" + sys.argv[0]) echo "${I18n.jobinfo_field_executorparam}:$argv[1] \n"; echo "${I18n.jobinfo_shard_index} = $argv[2] \n"; echo "${I18n.jobinfo_shard_total} = $argv[3] \n"; + echo "${I18n.jobinfo_field_shardingparam} = $argv[4] \n"; echo "Good bye! \n"; exit(0); @@ -433,6 +444,13 @@ exit 0 +
+ +
+ +
+
+

${I18n.jobinfo_conf_advanced}

<#-- 高级配置 --> diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java index 4f56368a..51d4bd5b 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -12,6 +12,7 @@ public class TriggerParam implements Serializable{ private String executorHandler; private String executorParams; + private String shardingParam; private String executorBlockStrategy; private int executorTimeout; @@ -46,6 +47,14 @@ public class TriggerParam implements Serializable{ return executorParams; } + public String getShardingParam() { + return shardingParam; + } + + public void setShardingParam(String shardingParam) { + this.shardingParam = shardingParam; + } + public void setExecutorParams(String executorParams) { this.executorParams = executorParams; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java index 7e350129..117e9612 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java @@ -23,6 +23,10 @@ public class XxlJobContext { * job param */ private final String jobParam; + /** + * job param + */ + private final String jobShardingParam; // ---------------------- for log ---------------------- @@ -64,6 +68,7 @@ public class XxlJobContext { public XxlJobContext(long jobId, String jobParam, String jobLogFileName, int shardIndex, int shardTotal) { this.jobId = jobId; this.jobParam = jobParam; + this.jobShardingParam = null; this.jobLogFileName = jobLogFileName; this.shardIndex = shardIndex; this.shardTotal = shardTotal; @@ -71,6 +76,16 @@ public class XxlJobContext { this.handleCode = HANDLE_CODE_SUCCESS; // default success } + public XxlJobContext(long jobId, String jobParam, String jobShardingParam, String jobLogFileName, int shardIndex, int shardTotal) { + this.jobId = jobId; + this.jobParam = jobParam; + this.jobShardingParam = jobShardingParam; + this.jobLogFileName = jobLogFileName; + this.shardIndex = shardIndex; + this.shardTotal = shardTotal; + this.handleCode = HANDLE_CODE_SUCCESS; // default success + } + public long getJobId() { return jobId; } @@ -79,6 +94,10 @@ public class XxlJobContext { return jobParam; } + public String getJobShardingParam() { + return jobShardingParam; + } + public String getJobLogFileName() { return jobLogFileName; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java index eb20c181..e14ad16d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java @@ -48,6 +48,20 @@ public class XxlJobHelper { return xxlJobContext.getJobParam(); } + /** + * current JobParam + * + * @return + */ + public static String getJobShardingParam() { + XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext(); + if (xxlJobContext == null) { + return null; + } + + return xxlJobContext.getJobShardingParam(); + } + // ---------------------- for log ---------------------- /** diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java b/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java index a9dc1bea..ca134c2c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java @@ -6,7 +6,7 @@ package com.xxl.job.core.enums; public enum ExecutorBlockStrategyEnum { SERIAL_EXECUTION("Serial execution"), - /*CONCURRENT_EXECUTION("并行"),*/ + CONCURRENT_EXECUTION("Concurrent execution"), DISCARD_LATER("Discard Later"), COVER_EARLY("Cover Early"); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index cf07a55a..29d8772d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -5,6 +5,7 @@ import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; @@ -103,6 +104,7 @@ public class JobThread extends Thread{ logger.error(e.getMessage(), e); } + ExecutorService executor2 = Executors.newFixedThreadPool(5); // execute while(!toStop){ running = false; @@ -112,7 +114,7 @@ public class JobThread extends Thread{ try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); - if (triggerParam!=null) { + if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); @@ -122,6 +124,7 @@ public class JobThread extends Thread{ XxlJobContext xxlJobContext = new XxlJobContext( triggerParam.getJobId(), triggerParam.getExecutorParams(), + triggerParam.getShardingParam(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); @@ -130,7 +133,7 @@ public class JobThread extends Thread{ XxlJobContext.setXxlJobContext(xxlJobContext); // execute - XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); + XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()+ "Sharding Param:" + xxlJobContext.getJobShardingParam()); if (triggerParam.getExecutorTimeout() > 0) { // limit timeout @@ -161,6 +164,19 @@ public class JobThread extends Thread{ } finally { futureThread.interrupt(); } + } else if (triggerParam.getExecutorBlockStrategy().equals(ExecutorBlockStrategyEnum.CONCURRENT_EXECUTION.name())){ + executor2.execute(new Runnable() { + @Override + public void run() { + try { + // init job context + XxlJobContext.setXxlJobContext(xxlJobContext); + handler.execute(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); } else { // just execute handler.execute(); @@ -225,6 +241,7 @@ public class JobThread extends Thread{ } } } + executor2.shutdown(); // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 40acac00..a7c5b55c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -191,6 +191,7 @@ public class TriggerCallbackThread { XxlJobContext.setXxlJobContext(new XxlJobContext( -1, null, + null, logFileName, -1, -1)); diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java index 759d6625..1d7781b7 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java @@ -1,5 +1,6 @@ package com.xxl.job.executor.service.jobhandler; +import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; @@ -37,9 +38,13 @@ public class SampleXxlJob { @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World."); - - for (int i = 0; i < 5; i++) { + XxlJobContext jobContext = XxlJobContext.getXxlJobContext(); + logger.info("执行分片:{}, 分片总数:{},执行参数为:{},执行分片参数为:{}", + jobContext.getShardIndex(), jobContext.getShardTotal(), + jobContext.getJobParam(), jobContext.getJobShardingParam()); + for (int i = 0; i < 1; i++) { XxlJobHelper.log("beat at:" + i); + logger.info("beat at:" + i); TimeUnit.SECONDS.sleep(2); } // default success