diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java
index e47b6dc6..8be56ad0 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java
@@ -27,6 +27,7 @@ public class XxlJobInfo {
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器,任务Handler名称
private String executorParam; // 执行器,任务参数
+ private String shardingParam; // 执行器,分片参数
private String executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
@@ -42,7 +43,6 @@ public class XxlJobInfo {
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
-
public int getId() {
return id;
}
@@ -147,6 +147,14 @@ public class XxlJobInfo {
this.executorParam = executorParam;
}
+ public String getShardingParam() {
+ return shardingParam;
+ }
+
+ public void setShardingParam(String shardingParam) {
+ this.shardingParam = shardingParam;
+ }
+
public String getExecutorBlockStrategy() {
return executorBlockStrategy;
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java
index 7fff93a9..f101d65c 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java
@@ -17,7 +17,7 @@ public enum ExecutorRouteStrategyEnum {
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
- SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
+ SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), new ExecutorRouteSharding());
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java
new file mode 100644
index 00000000..6becbfc6
--- /dev/null
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteSharding.java
@@ -0,0 +1,23 @@
+package com.xxl.job.admin.core.route.strategy;
+
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.biz.model.TriggerParam;
+
+import java.util.List;
+
+/**
+ *
+ * 分片规则的-路由器-直接采取的是随机路由算法
+ *
+ *
+ * @author daiqi
+ * @since 2022/5/8 18:23
+ */
+public class ExecutorRouteSharding extends ExecutorRouteBusyover {
+
+ @Override
+ public ReturnT route(TriggerParam triggerParam, List addressList) {
+ return super.route(triggerParam, addressList);
+ }
+
+}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
index 748befc6..65aedd66 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
@@ -13,10 +13,12 @@ import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.ThrowableUtil;
+import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
+import java.util.List;
/**
* xxl-job trigger
@@ -79,8 +81,9 @@ public class XxlJobTrigger {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
- for (int i = 0; i < group.getRegistryList().size(); i++) {
- processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
+ int shardingTotal = getSharingTotal(jobInfo.getShardingParam());
+ for (int i = 0; i < shardingTotal; i++) {
+ processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, shardingTotal);
}
} else {
if (shardingParam == null) {
@@ -91,6 +94,35 @@ public class XxlJobTrigger {
}
+ private static int getSharingTotal(String executorShardingParam) {
+ if (StringUtil.isNullOrEmpty(executorShardingParam)) {
+ return 1;
+ }
+ // 获取分片的数量
+ return getShardingArr(executorShardingParam).length;
+ }
+
+ public static String getShardingParam(String shardingParams, int index) {
+ String[] shardingArr = getShardingArr(shardingParams);
+ String[] shardingParamArr = shardingArr[index].split("=");
+ if (shardingParamArr.length >= 2) {
+ return shardingParamArr[1];
+ }
+ return shardingParamArr[0];
+ }
+
+ private static String [] getShardingArr(String shardingParams) {
+ if (StringUtil.isNullOrEmpty(shardingParams)) {
+ return new String[0];
+ }
+ return shardingParams.split("/");
+ }
+
+ public static class ShardingInfo {
+ private int total;
+ private List shardingParams;
+ }
+
private static boolean isNumeric(String str){
try {
int result = Integer.valueOf(str);
@@ -113,7 +145,7 @@ public class XxlJobTrigger {
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
- String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
+ String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?getShardingParam(jobInfo.getShardingParam(), index):null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
@@ -128,6 +160,7 @@ public class XxlJobTrigger {
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
+ triggerParam.setShardingParam(shardingParam);
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
@@ -142,17 +175,9 @@ public class XxlJobTrigger {
String address = null;
ReturnT routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
- if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
- if (index < group.getRegistryList().size()) {
- address = group.getRegistryList().get(index);
- } else {
- address = group.getRegistryList().get(0);
- }
- } else {
- routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
- if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
- address = routeAddressResult.getContent();
- }
+ routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
+ if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
+ address = routeAddressResult.getContent();
}
} else {
routeAddressResult = new ReturnT(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java
index 530ee41c..f7d1fb0a 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java
@@ -274,6 +274,7 @@ public class XxlJobServiceImpl implements XxlJobService {
exists_jobInfo.setExecutorRouteStrategy(jobInfo.getExecutorRouteStrategy());
exists_jobInfo.setExecutorHandler(jobInfo.getExecutorHandler());
exists_jobInfo.setExecutorParam(jobInfo.getExecutorParam());
+ exists_jobInfo.setShardingParam(jobInfo.getShardingParam());
exists_jobInfo.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
exists_jobInfo.setExecutorTimeout(jobInfo.getExecutorTimeout());
exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount());
diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties
index afe93b42..1789e6c8 100644
--- a/xxl-job-admin/src/main/resources/application.properties
+++ b/xxl-job-admin/src/main/resources/application.properties
@@ -25,7 +25,7 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
-spring.datasource.password=root_pwd
+spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool
diff --git a/xxl-job-admin/src/main/resources/i18n/message_en.properties b/xxl-job-admin/src/main/resources/i18n/message_en.properties
index 7d67636d..1408355a 100644
--- a/xxl-job-admin/src/main/resources/i18n/message_en.properties
+++ b/xxl-job-admin/src/main/resources/i18n/message_en.properties
@@ -117,6 +117,7 @@ jobinfo_field_jobdesc=Job description
jobinfo_field_timeout=Job timeout period
jobinfo_field_gluetype=GLUE Type
jobinfo_field_executorparam=Param
+jobinfo_field_shardingparam=Sharding Param
jobinfo_field_author=Author
jobinfo_field_alarmemail=Alarm email
jobinfo_field_alarmemail_placeholder=Please enter alarm mail, if there are more than one comma separated
@@ -221,6 +222,7 @@ jobgroup_empty=There is no valid executor. Please contact the administrator
## job conf
jobconf_block_SERIAL_EXECUTION=Serial execution
+jobconf_block_CONCURRENT_EXECUTION=Concurrent execution
jobconf_block_DISCARD_LATER=Discard Later
jobconf_block_COVER_EARLY=Cover Early
jobconf_route_first=First
diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties
index 4897a238..e8edbdcc 100644
--- a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties
+++ b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties
@@ -116,6 +116,7 @@ jobinfo_field_jobgroup=执行器
jobinfo_field_jobdesc=任务描述
jobinfo_field_gluetype=运行模式
jobinfo_field_executorparam=任务参数
+jobinfo_field_shardingparam=任务分片参数
jobinfo_field_author=负责人
jobinfo_field_timeout=任务超时时间
jobinfo_field_alarmemail=报警邮件
@@ -221,6 +222,7 @@ jobgroup_empty=不存在有效执行器,请联系管理员
## job conf
jobconf_block_SERIAL_EXECUTION=单机串行
+jobconf_block_CONCURRENT_EXECUTION=单机并行
jobconf_block_DISCARD_LATER=丢弃后续调度
jobconf_block_COVER_EARLY=覆盖之前调度
jobconf_route_first=第一个
diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties
index 57d28e4f..f98eb649 100755
--- a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties
+++ b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties
@@ -116,6 +116,7 @@ jobinfo_field_jobgroup=執行器
jobinfo_field_jobdesc=任務描述
jobinfo_field_gluetype=運行模式
jobinfo_field_executorparam=任務參數
+jobinfo_field_shardingparam=分片參數
jobinfo_field_author=負責人
jobinfo_field_timeout=任務超時秒數
jobinfo_field_alarmemail=告警郵件
@@ -221,6 +222,7 @@ jobgroup_empty=不存在有效執行器,請聯絡系統管理員
## job conf
jobconf_block_SERIAL_EXECUTION=單機串行
+jobconf_block_CONCURRENT_EXECUTION=单机并行
jobconf_block_DISCARD_LATER=丢棄后續調度
jobconf_block_COVER_EARLY=覆蓋之前調度
jobconf_route_first=第一個
diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml
index 7b3c3a3e..6968d250 100644
--- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml
+++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml
@@ -22,6 +22,7 @@
+
@@ -52,6 +53,7 @@
t.executor_route_strategy,
t.executor_handler,
t.executor_param,
+ t.sharding_param,
t.executor_block_strategy,
t.executor_timeout,
t.executor_fail_retry_count,
@@ -125,6 +127,7 @@
executor_route_strategy,
executor_handler,
executor_param,
+ sharding_param,
executor_block_strategy,
executor_timeout,
executor_fail_retry_count,
@@ -149,6 +152,7 @@
#{executorRouteStrategy},
#{executorHandler},
#{executorParam},
+ #{shardingParam},
#{executorBlockStrategy},
#{executorTimeout},
#{executorFailRetryCount},
@@ -187,6 +191,7 @@
executor_route_strategy = #{executorRouteStrategy},
executor_handler = #{executorHandler},
executor_param = #{executorParam},
+ sharding_param = #{shardingParam},
executor_block_strategy = #{executorBlockStrategy},
executor_timeout = ${executorTimeout},
executor_fail_retry_count = ${executorFailRetryCount},
diff --git a/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js b/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js
index b479e972..8a472ec6 100644
--- a/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js
+++ b/xxl-job-admin/src/main/resources/static/js/jobinfo.index.1.js
@@ -74,6 +74,7 @@ $(function() {
}
},
{ "data": 'executorParam', "visible" : false},
+ { "data": 'shardingParam', "visible" : false},
{
"data": 'addTime',
"visible" : false,
@@ -266,6 +267,7 @@ $(function() {
$("#jobTriggerModal .form input[name='id']").val( row.id );
$("#jobTriggerModal .form textarea[name='executorParam']").val( row.executorParam );
+ $("#jobTriggerModal .form textarea[name='shardingParam']").val( row.shardingParam );
$('#jobTriggerModal').modal({backdrop: false, keyboard: false}).modal('show');
});
@@ -276,6 +278,7 @@ $(function() {
data : {
"id" : $("#jobTriggerModal .form input[name='id']").val(),
"executorParam" : $("#jobTriggerModal .textarea[name='executorParam']").val(),
+ "shardingParam" : $("#jobTriggerModal .textarea[name='shardingParam']").val(),
"addressList" : $("#jobTriggerModal .textarea[name='addressList']").val()
},
dataType : "json",
@@ -561,6 +564,7 @@ $(function() {
$('#updateModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true);
$("#updateModal .form input[name='executorHandler']").val( row.executorHandler );
$("#updateModal .form textarea[name='executorParam']").val( row.executorParam );
+ $("#updateModal .form textarea[name='shardingParam']").val( row.shardingParam );
// 》init glueType
$("#updateModal .form select[name=glueType]").change();
@@ -716,6 +720,7 @@ $(function() {
$('#addModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true);
$("#addModal .form input[name='executorHandler']").val( row.executorHandler );
$("#addModal .form textarea[name='executorParam']").val( row.executorParam );
+ $("#addModal .form textarea[name='shardingParam']").val( row.shardingParam );
// 》init glueType
$("#addModal .form select[name=glueType]").change();
diff --git a/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl b/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl
index 3a5d7d8a..4b715352 100644
--- a/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl
+++ b/xxl-job-admin/src/main/resources/templates/jobinfo/jobinfo.index.ftl
@@ -83,6 +83,7 @@
${I18n.schedule_type} |
${I18n.jobinfo_field_gluetype} |
${I18n.jobinfo_field_executorparam} |
+ ${I18n.jobinfo_field_shardingparam} |
addTime |
updateTime |
${I18n.jobinfo_field_author} |
@@ -188,6 +189,13 @@
+
+
${I18n.jobinfo_conf_advanced}
<#-- 高级配置 -->
@@ -265,6 +273,7 @@ echo "${I18n.jobinfo_script_location}:$0"
echo "${I18n.jobinfo_field_executorparam}:$1"
echo "${I18n.jobinfo_shard_index} = $2"
echo "${I18n.jobinfo_shard_total} = $3"
+echo "${I18n.jobinfo_field_shardingparam}:$4"
<#--echo "参数数量:$#"
for param in $*
do
@@ -287,6 +296,7 @@ print "${I18n.jobinfo_script_location}:", sys.argv[0]
print "${I18n.jobinfo_field_executorparam}:", sys.argv[1]
print "${I18n.jobinfo_shard_index}:", sys.argv[2]
print "${I18n.jobinfo_shard_total}:", sys.argv[3]
+print "${I18n.jobinfo_field_shardingparam}:", sys.argv[4]
<#--for i in range(1, len(sys.argv)):
time.sleep(1)
print "参数", i, sys.argv[i]-->
@@ -309,6 +319,7 @@ logging.info("脚本文件:" + sys.argv[0])
echo "${I18n.jobinfo_field_executorparam}:$argv[1] \n";
echo "${I18n.jobinfo_shard_index} = $argv[2] \n";
echo "${I18n.jobinfo_shard_total} = $argv[3] \n";
+ echo "${I18n.jobinfo_field_shardingparam} = $argv[4] \n";
echo "Good bye! \n";
exit(0);
@@ -433,6 +444,13 @@ exit 0
+
+
${I18n.jobinfo_conf_advanced}
<#-- 高级配置 -->
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java
index 4f56368a..51d4bd5b 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java
@@ -12,6 +12,7 @@ public class TriggerParam implements Serializable{
private String executorHandler;
private String executorParams;
+ private String shardingParam;
private String executorBlockStrategy;
private int executorTimeout;
@@ -46,6 +47,14 @@ public class TriggerParam implements Serializable{
return executorParams;
}
+ public String getShardingParam() {
+ return shardingParam;
+ }
+
+ public void setShardingParam(String shardingParam) {
+ this.shardingParam = shardingParam;
+ }
+
public void setExecutorParams(String executorParams) {
this.executorParams = executorParams;
}
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java
index 7e350129..117e9612 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java
@@ -23,6 +23,10 @@ public class XxlJobContext {
* job param
*/
private final String jobParam;
+ /**
+ * job param
+ */
+ private final String jobShardingParam;
// ---------------------- for log ----------------------
@@ -64,6 +68,7 @@ public class XxlJobContext {
public XxlJobContext(long jobId, String jobParam, String jobLogFileName, int shardIndex, int shardTotal) {
this.jobId = jobId;
this.jobParam = jobParam;
+ this.jobShardingParam = null;
this.jobLogFileName = jobLogFileName;
this.shardIndex = shardIndex;
this.shardTotal = shardTotal;
@@ -71,6 +76,16 @@ public class XxlJobContext {
this.handleCode = HANDLE_CODE_SUCCESS; // default success
}
+ public XxlJobContext(long jobId, String jobParam, String jobShardingParam, String jobLogFileName, int shardIndex, int shardTotal) {
+ this.jobId = jobId;
+ this.jobParam = jobParam;
+ this.jobShardingParam = jobShardingParam;
+ this.jobLogFileName = jobLogFileName;
+ this.shardIndex = shardIndex;
+ this.shardTotal = shardTotal;
+ this.handleCode = HANDLE_CODE_SUCCESS; // default success
+ }
+
public long getJobId() {
return jobId;
}
@@ -79,6 +94,10 @@ public class XxlJobContext {
return jobParam;
}
+ public String getJobShardingParam() {
+ return jobShardingParam;
+ }
+
public String getJobLogFileName() {
return jobLogFileName;
}
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java
index eb20c181..e14ad16d 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobHelper.java
@@ -48,6 +48,20 @@ public class XxlJobHelper {
return xxlJobContext.getJobParam();
}
+ /**
+ * current JobParam
+ *
+ * @return
+ */
+ public static String getJobShardingParam() {
+ XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
+ if (xxlJobContext == null) {
+ return null;
+ }
+
+ return xxlJobContext.getJobShardingParam();
+ }
+
// ---------------------- for log ----------------------
/**
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java b/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java
index a9dc1bea..ca134c2c 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/enums/ExecutorBlockStrategyEnum.java
@@ -6,7 +6,7 @@ package com.xxl.job.core.enums;
public enum ExecutorBlockStrategyEnum {
SERIAL_EXECUTION("Serial execution"),
- /*CONCURRENT_EXECUTION("并行"),*/
+ CONCURRENT_EXECUTION("Concurrent execution"),
DISCARD_LATER("Discard Later"),
COVER_EARLY("Cover Early");
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
index cf07a55a..29d8772d 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
@@ -5,6 +5,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
@@ -103,6 +104,7 @@ public class JobThread extends Thread{
logger.error(e.getMessage(), e);
}
+ ExecutorService executor2 = Executors.newFixedThreadPool(5);
// execute
while(!toStop){
running = false;
@@ -112,7 +114,7 @@ public class JobThread extends Thread{
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
- if (triggerParam!=null) {
+ if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
@@ -122,6 +124,7 @@ public class JobThread extends Thread{
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
+ triggerParam.getShardingParam(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
@@ -130,7 +133,7 @@ public class JobThread extends Thread{
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
- XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam());
+ XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()+ "Sharding Param:" + xxlJobContext.getJobShardingParam());
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
@@ -161,6 +164,19 @@ public class JobThread extends Thread{
} finally {
futureThread.interrupt();
}
+ } else if (triggerParam.getExecutorBlockStrategy().equals(ExecutorBlockStrategyEnum.CONCURRENT_EXECUTION.name())){
+ executor2.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // init job context
+ XxlJobContext.setXxlJobContext(xxlJobContext);
+ handler.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
} else {
// just execute
handler.execute();
@@ -225,6 +241,7 @@ public class JobThread extends Thread{
}
}
}
+ executor2.shutdown();
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java
index 40acac00..a7c5b55c 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java
@@ -191,6 +191,7 @@ public class TriggerCallbackThread {
XxlJobContext.setXxlJobContext(new XxlJobContext(
-1,
null,
+ null,
logFileName,
-1,
-1));
diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java
index 759d6625..1d7781b7 100644
--- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java
+++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java
@@ -1,5 +1,6 @@
package com.xxl.job.executor.service.jobhandler;
+import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
@@ -37,9 +38,13 @@ public class SampleXxlJob {
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
-
- for (int i = 0; i < 5; i++) {
+ XxlJobContext jobContext = XxlJobContext.getXxlJobContext();
+ logger.info("执行分片:{}, 分片总数:{},执行参数为:{},执行分片参数为:{}",
+ jobContext.getShardIndex(), jobContext.getShardTotal(),
+ jobContext.getJobParam(), jobContext.getJobShardingParam());
+ for (int i = 0; i < 1; i++) {
XxlJobHelper.log("beat at:" + i);
+ logger.info("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success