添加分片并发多线程的模式执行

pull/39/head
sikadai 3 years ago
parent 3604b35483
commit 00535b6524

@ -27,6 +27,7 @@ public class XxlJobInfo {
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器任务Handler名称
private String executorParam; // 执行器,任务参数
private String shardingParam; // 执行器,分片参数
private String executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
@ -42,7 +43,6 @@ public class XxlJobInfo {
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
public int getId() {
return id;
}
@ -147,6 +147,14 @@ public class XxlJobInfo {
this.executorParam = executorParam;
}
public String getShardingParam() {
return shardingParam;
}
public void setShardingParam(String shardingParam) {
this.shardingParam = shardingParam;
}
public String getExecutorBlockStrategy() {
return executorBlockStrategy;
}

@ -17,7 +17,7 @@ public enum ExecutorRouteStrategyEnum {
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), new ExecutorRouteSharding());
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;

@ -0,0 +1,23 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.List;
/**
* <p>
* --
* </p>
*
* @author daiqi
* @since 2022/5/8 18:23
*/
public class ExecutorRouteSharding extends ExecutorRouteBusyover {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return super.route(triggerParam, addressList);
}
}

@ -13,10 +13,12 @@ import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.ThrowableUtil;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
/**
* xxl-job trigger
@ -79,8 +81,9 @@ public class XxlJobTrigger {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
int shardingTotal = getSharingTotal(jobInfo.getShardingParam());
for (int i = 0; i < shardingTotal; i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, shardingTotal);
}
} else {
if (shardingParam == null) {
@ -91,6 +94,35 @@ public class XxlJobTrigger {
}
private static int getSharingTotal(String executorShardingParam) {
if (StringUtil.isNullOrEmpty(executorShardingParam)) {
return 1;
}
// 获取分片的数量
return getShardingArr(executorShardingParam).length;
}
public static String getShardingParam(String shardingParams, int index) {
String[] shardingArr = getShardingArr(shardingParams);
String[] shardingParamArr = shardingArr[index].split("=");
if (shardingParamArr.length >= 2) {
return shardingParamArr[1];
}
return shardingParamArr[0];
}
private static String [] getShardingArr(String shardingParams) {
if (StringUtil.isNullOrEmpty(shardingParams)) {
return new String[0];
}
return shardingParams.split("/");
}
public static class ShardingInfo {
private int total;
private List<String> shardingParams;
}
private static boolean isNumeric(String str){
try {
int result = Integer.valueOf(str);
@ -113,7 +145,7 @@ public class XxlJobTrigger {
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?getShardingParam(jobInfo.getShardingParam(), index):null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
@ -128,6 +160,7 @@ public class XxlJobTrigger {
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setShardingParam(shardingParam);
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
@ -142,17 +175,9 @@ public class XxlJobTrigger {
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));

@ -274,6 +274,7 @@ public class XxlJobServiceImpl implements XxlJobService {
exists_jobInfo.setExecutorRouteStrategy(jobInfo.getExecutorRouteStrategy());
exists_jobInfo.setExecutorHandler(jobInfo.getExecutorHandler());
exists_jobInfo.setExecutorParam(jobInfo.getExecutorParam());
exists_jobInfo.setShardingParam(jobInfo.getShardingParam());
exists_jobInfo.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
exists_jobInfo.setExecutorTimeout(jobInfo.getExecutorTimeout());
exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount());

@ -25,7 +25,7 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool

@ -117,6 +117,7 @@ jobinfo_field_jobdesc=Job description
jobinfo_field_timeout=Job timeout period
jobinfo_field_gluetype=GLUE Type
jobinfo_field_executorparam=Param
jobinfo_field_shardingparam=Sharding Param
jobinfo_field_author=Author
jobinfo_field_alarmemail=Alarm email
jobinfo_field_alarmemail_placeholder=Please enter alarm mail, if there are more than one comma separated
@ -221,6 +222,7 @@ jobgroup_empty=There is no valid executor. Please contact the administrator
## job conf
jobconf_block_SERIAL_EXECUTION=Serial execution
jobconf_block_CONCURRENT_EXECUTION=Concurrent execution
jobconf_block_DISCARD_LATER=Discard Later
jobconf_block_COVER_EARLY=Cover Early
jobconf_route_first=First

@ -116,6 +116,7 @@ jobinfo_field_jobgroup=执行器
jobinfo_field_jobdesc=任务描述
jobinfo_field_gluetype=运行模式
jobinfo_field_executorparam=任务参数
jobinfo_field_shardingparam=任务分片参数
jobinfo_field_author=负责人
jobinfo_field_timeout=任务超时时间
jobinfo_field_alarmemail=报警邮件
@ -221,6 +222,7 @@ jobgroup_empty=不存在有效执行器,请联系管理员
## job conf
jobconf_block_SERIAL_EXECUTION=单机串行
jobconf_block_CONCURRENT_EXECUTION=单机并行
jobconf_block_DISCARD_LATER=丢弃后续调度
jobconf_block_COVER_EARLY=覆盖之前调度
jobconf_route_first=第一个

@ -116,6 +116,7 @@ jobinfo_field_jobgroup=執行器
jobinfo_field_jobdesc=任務描述
jobinfo_field_gluetype=運行模式
jobinfo_field_executorparam=任務參數
jobinfo_field_shardingparam=分片參數
jobinfo_field_author=負責人
jobinfo_field_timeout=任務超時秒數
jobinfo_field_alarmemail=告警郵件
@ -221,6 +222,7 @@ jobgroup_empty=不存在有效執行器,請聯絡系統管理員
## job conf
jobconf_block_SERIAL_EXECUTION=單機串行
jobconf_block_CONCURRENT_EXECUTION=单机并行
jobconf_block_DISCARD_LATER=丢棄后續調度
jobconf_block_COVER_EARLY=覆蓋之前調度
jobconf_route_first=第一個

@ -22,6 +22,7 @@
<result column="executor_route_strategy" property="executorRouteStrategy" />
<result column="executor_handler" property="executorHandler" />
<result column="executor_param" property="executorParam" />
<result column="sharding_param" property="shardingParam" />
<result column="executor_block_strategy" property="executorBlockStrategy" />
<result column="executor_timeout" property="executorTimeout" />
<result column="executor_fail_retry_count" property="executorFailRetryCount" />
@ -52,6 +53,7 @@
t.executor_route_strategy,
t.executor_handler,
t.executor_param,
t.sharding_param,
t.executor_block_strategy,
t.executor_timeout,
t.executor_fail_retry_count,
@ -125,6 +127,7 @@
executor_route_strategy,
executor_handler,
executor_param,
sharding_param,
executor_block_strategy,
executor_timeout,
executor_fail_retry_count,
@ -149,6 +152,7 @@
#{executorRouteStrategy},
#{executorHandler},
#{executorParam},
#{shardingParam},
#{executorBlockStrategy},
#{executorTimeout},
#{executorFailRetryCount},
@ -187,6 +191,7 @@
executor_route_strategy = #{executorRouteStrategy},
executor_handler = #{executorHandler},
executor_param = #{executorParam},
sharding_param = #{shardingParam},
executor_block_strategy = #{executorBlockStrategy},
executor_timeout = ${executorTimeout},
executor_fail_retry_count = ${executorFailRetryCount},

@ -74,6 +74,7 @@ $(function() {
}
},
{ "data": 'executorParam', "visible" : false},
{ "data": 'shardingParam', "visible" : false},
{
"data": 'addTime',
"visible" : false,
@ -266,6 +267,7 @@ $(function() {
$("#jobTriggerModal .form input[name='id']").val( row.id );
$("#jobTriggerModal .form textarea[name='executorParam']").val( row.executorParam );
$("#jobTriggerModal .form textarea[name='shardingParam']").val( row.shardingParam );
$('#jobTriggerModal').modal({backdrop: false, keyboard: false}).modal('show');
});
@ -276,6 +278,7 @@ $(function() {
data : {
"id" : $("#jobTriggerModal .form input[name='id']").val(),
"executorParam" : $("#jobTriggerModal .textarea[name='executorParam']").val(),
"shardingParam" : $("#jobTriggerModal .textarea[name='shardingParam']").val(),
"addressList" : $("#jobTriggerModal .textarea[name='addressList']").val()
},
dataType : "json",
@ -561,6 +564,7 @@ $(function() {
$('#updateModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true);
$("#updateModal .form input[name='executorHandler']").val( row.executorHandler );
$("#updateModal .form textarea[name='executorParam']").val( row.executorParam );
$("#updateModal .form textarea[name='shardingParam']").val( row.shardingParam );
// 》init glueType
$("#updateModal .form select[name=glueType]").change();
@ -716,6 +720,7 @@ $(function() {
$('#addModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true);
$("#addModal .form input[name='executorHandler']").val( row.executorHandler );
$("#addModal .form textarea[name='executorParam']").val( row.executorParam );
$("#addModal .form textarea[name='shardingParam']").val( row.shardingParam );
// 》init glueType
$("#addModal .form select[name=glueType]").change();

@ -83,6 +83,7 @@
<th name="scheduleType" >${I18n.schedule_type}</th>
<th name="glueType" >${I18n.jobinfo_field_gluetype}</th>
<th name="executorParam" >${I18n.jobinfo_field_executorparam}</th>
<th name="shardingParam" >${I18n.jobinfo_field_shardingparam}</th>
<th name="addTime" >addTime</th>
<th name="updateTime" >updateTime</th>
<th name="author" >${I18n.jobinfo_field_author}</th>
@ -188,6 +189,13 @@
</div>
</div>
<div class="form-group">
<label for="firstname" class="col-sm-2 control-label">${I18n.jobinfo_field_shardingparam}<font color="black">*</font></label>
<div class="col-sm-10">
<textarea class="textarea form-control" name="shardingParam" placeholder="${I18n.system_please_input}${I18n.jobinfo_field_shardingparam}" maxlength="512" style="height: 63px; line-height: 1.2;"></textarea>
</div>
</div>
<br>
<p style="margin: 0 0 10px;text-align: left;border-bottom: 1px solid #e5e5e5;color: gray;">${I18n.jobinfo_conf_advanced}</p> <#-- -->
@ -265,6 +273,7 @@ echo "${I18n.jobinfo_script_location}$0"
echo "${I18n.jobinfo_field_executorparam}$1"
echo "${I18n.jobinfo_shard_index} = $2"
echo "${I18n.jobinfo_shard_total} = $3"
echo "${I18n.jobinfo_field_shardingparam}$4"
<#--echo "参数数量:$#"
for param in $*
do
@ -287,6 +296,7 @@ print "${I18n.jobinfo_script_location}", sys.argv[0]
print "${I18n.jobinfo_field_executorparam}", sys.argv[1]
print "${I18n.jobinfo_shard_index}", sys.argv[2]
print "${I18n.jobinfo_shard_total}", sys.argv[3]
print "${I18n.jobinfo_field_shardingparam}", sys.argv[4]
<#--for i in range(1, len(sys.argv)):
time.sleep(1)
print "参数", i, sys.argv[i]-->
@ -309,6 +319,7 @@ logging.info("脚本文件:" + sys.argv[0])
echo "${I18n.jobinfo_field_executorparam}$argv[1] \n";
echo "${I18n.jobinfo_shard_index} = $argv[2] \n";
echo "${I18n.jobinfo_shard_total} = $argv[3] \n";
echo "${I18n.jobinfo_field_shardingparam} = $argv[4] \n";
echo "Good bye! \n";
exit(0);
@ -433,6 +444,13 @@ exit 0
</div>
</div>
<div class="form-group">
<label for="firstname" class="col-sm-2 control-label">${I18n.jobinfo_field_shardingparam}<font color="black">*</font></label>
<div class="col-sm-10">
<textarea class="textarea form-control" name="shardingParam" placeholder="${I18n.system_please_input}${I18n.jobinfo_field_shardingparam}" maxlength="512" style="height: 63px; line-height: 1.2;"></textarea>
</div>
</div>
<br>
<p style="margin: 0 0 10px;text-align: left;border-bottom: 1px solid #e5e5e5;color: gray;">${I18n.jobinfo_conf_advanced}</p> <#-- -->

@ -12,6 +12,7 @@ public class TriggerParam implements Serializable{
private String executorHandler;
private String executorParams;
private String shardingParam;
private String executorBlockStrategy;
private int executorTimeout;
@ -46,6 +47,14 @@ public class TriggerParam implements Serializable{
return executorParams;
}
public String getShardingParam() {
return shardingParam;
}
public void setShardingParam(String shardingParam) {
this.shardingParam = shardingParam;
}
public void setExecutorParams(String executorParams) {
this.executorParams = executorParams;
}

@ -23,6 +23,10 @@ public class XxlJobContext {
* job param
*/
private final String jobParam;
/**
* job param
*/
private final String jobShardingParam;
// ---------------------- for log ----------------------
@ -64,6 +68,7 @@ public class XxlJobContext {
public XxlJobContext(long jobId, String jobParam, String jobLogFileName, int shardIndex, int shardTotal) {
this.jobId = jobId;
this.jobParam = jobParam;
this.jobShardingParam = null;
this.jobLogFileName = jobLogFileName;
this.shardIndex = shardIndex;
this.shardTotal = shardTotal;
@ -71,6 +76,16 @@ public class XxlJobContext {
this.handleCode = HANDLE_CODE_SUCCESS; // default success
}
public XxlJobContext(long jobId, String jobParam, String jobShardingParam, String jobLogFileName, int shardIndex, int shardTotal) {
this.jobId = jobId;
this.jobParam = jobParam;
this.jobShardingParam = jobShardingParam;
this.jobLogFileName = jobLogFileName;
this.shardIndex = shardIndex;
this.shardTotal = shardTotal;
this.handleCode = HANDLE_CODE_SUCCESS; // default success
}
public long getJobId() {
return jobId;
}
@ -79,6 +94,10 @@ public class XxlJobContext {
return jobParam;
}
public String getJobShardingParam() {
return jobShardingParam;
}
public String getJobLogFileName() {
return jobLogFileName;
}

@ -48,6 +48,20 @@ public class XxlJobHelper {
return xxlJobContext.getJobParam();
}
/**
* current JobParam
*
* @return
*/
public static String getJobShardingParam() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
if (xxlJobContext == null) {
return null;
}
return xxlJobContext.getJobShardingParam();
}
// ---------------------- for log ----------------------
/**

@ -6,7 +6,7 @@ package com.xxl.job.core.enums;
public enum ExecutorBlockStrategyEnum {
SERIAL_EXECUTION("Serial execution"),
/*CONCURRENT_EXECUTION("并行"),*/
CONCURRENT_EXECUTION("Concurrent execution"),
DISCARD_LATER("Discard Later"),
COVER_EARLY("Cover Early");

@ -5,6 +5,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
@ -103,6 +104,7 @@ public class JobThread extends Thread{
logger.error(e.getMessage(), e);
}
ExecutorService executor2 = Executors.newFixedThreadPool(5);
// execute
while(!toStop){
running = false;
@ -112,7 +114,7 @@ public class JobThread extends Thread{
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
@ -122,6 +124,7 @@ public class JobThread extends Thread{
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
triggerParam.getShardingParam(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
@ -130,7 +133,7 @@ public class JobThread extends Thread{
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()+ "Sharding Param:" + xxlJobContext.getJobShardingParam());
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
@ -161,6 +164,19 @@ public class JobThread extends Thread{
} finally {
futureThread.interrupt();
}
} else if (triggerParam.getExecutorBlockStrategy().equals(ExecutorBlockStrategyEnum.CONCURRENT_EXECUTION.name())){
executor2.execute(new Runnable() {
@Override
public void run() {
try {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
// just execute
handler.execute();
@ -225,6 +241,7 @@ public class JobThread extends Thread{
}
}
}
executor2.shutdown();
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){

@ -191,6 +191,7 @@ public class TriggerCallbackThread {
XxlJobContext.setXxlJobContext(new XxlJobContext(
-1,
null,
null,
logFileName,
-1,
-1));

@ -1,5 +1,6 @@
package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
@ -37,9 +38,13 @@ public class SampleXxlJob {
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobContext jobContext = XxlJobContext.getXxlJobContext();
logger.info("执行分片:{} 分片总数:{},执行参数为:{},执行分片参数为:{}",
jobContext.getShardIndex(), jobContext.getShardTotal(),
jobContext.getJobParam(), jobContext.getJobShardingParam());
for (int i = 0; i < 1; i++) {
XxlJobHelper.log("beat at:" + i);
logger.info("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success

Loading…
Cancel
Save