Pre Merge pull request !39 from sikadai/concurrent_mult_thread

pull/39/MERGE
sikadai 3 years ago committed by Gitee
commit bf6aa91d28
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

@ -443,7 +443,7 @@ XXL-JOB是一个分布式任务调度平台其核心设计目标是开发迅
- 319、深圳市姜科网络有限公司
- 320、青岛日日顺物流有限公司
- 321、南京太川信息技术有限公司
- 322、美图之家科技优先公司【美图】
- 322、美图之家科技有限公司【美图】
- 323、南京太川信息技术有限公司
- 324、众薪科技北京有限公司
- 325、武汉安安物联科技有限公司
@ -568,6 +568,77 @@ XXL-JOB是一个分布式任务调度平台其核心设计目标是开发迅
- 444、上海金仕达软件科技有限公司
- 445、深圳易世通达科技有限公司
- 446、爱动超越人工智能科技北京有限责任公司
- 447、迪普信北京科技有限公司
- 448、掌站科技北京有限公司
- 449、深圳市华云中盛股份有限公司
- 450、上海原圈科技有限公司
- 451、广州赞赏信息科技有限公司
- 452、Amber Group
- 453、德威国际货运代理上海公司
- 454、浙江杰夫兄弟智慧科技有限公司
- 455、信也科技
- 456、开思时代科技深圳有限公司
- 457、大连槐德科技有限公司
- 458、同程生活
- 459、松果出行
- 460、企鹅杏仁集团
- 461、宁波科云信息科技有限公司
- 462、上海格蓝威驰信息科技有限公司
- 463、杭州趣淘鲸科技有限公司
- 464、湖州市数字惠民科技有限公司
- 465、乐普北京医疗器械股份有限公司
- 466、广州市晴川高新技术开发有限公司
- 467、山西缇客科技有限公司
- 468、徐州卡西穆电子商务有限公司
- 469、格创东智科技有限公司
- 470、世纪龙信息网络有限责任公司
- 471、邦道科技有限公司
- 472、河南中盟新云科技股份有限公司
- 473、横琴人寿保险有限公司
- 474、上海海隆华钟信息技术有限公司
- 475、上海久湛
- 476、上海仙豆智能机器人有限公司
- 477、广州汇尚网络科技有限公司
- 478、深圳市阿卡索资讯股份有限公司
- 479、青岛佳家康健康管理有限责任公司
- 480、蓝城兄弟
- 481、成都天府通金融服务股份有限公司
- 482、深圳云镖网络科技有限公司
- 483、上海影创科技
- 484、成都艾拉物联
- 485、北京客邻尚品网络技术有限公司
- 486、IT实战联盟
- 487、杭州尤拉夫科技有限公司
- 488、中大检测(湖南)股份有限公司
- 489、江苏电老虎工业互联网股份有限公司
- 490、上海助通信息科技有限公司
- 491、北京符节科技有限公司
- 492、杭州英祐科技有限公司
- 493、江苏电老虎工业互联网股份有限公司
- 494、深圳市点猫科技有限公司
- 495、杭州天音
- 496、深圳市二十一科技互联网有限公司
- 497、海南海口翎度科技
- 498、北京小趣智品科技有限公司
- 499、广州石竹计算机软件有限公司
- 500、深圳市惟客数据科技有限公司
- 501、中国医疗器械有限公司
- 502、上海云谦科技有限公司
- 503、上海磐农信息科技有限公司
- 504、广州领航食品有限公司
- 505、青岛掌讯通区块链科技有限公司
- 506、北京新网数码信息技术有限公司
- 507、超体信息科技(深圳)有限公司
- 508、长沙店帮手信息科技有限公司
- 509、上海助弓装饰工程有限公司
- 510、杭州寻联网络科技有限公司
- 511、成都大淘客科技有限公司
- 512、松果出行
- 513、深圳市唤梦科技有限公司
- 514、上汽集团商用车技术中心
- 515、北京中航讯科技股份有限公司
- 516、北龙中网(北京)科技有限责任公司
- 517、前海超级前台(深圳)信息技术有限公司
- ……
> 更多接入的公司,欢迎在 [登记地址](https://github.com/xuxueli/xxl-job/issues/1 ) 登记,登记仅仅为了产品推广。
@ -587,7 +658,7 @@ This product is open source and free, and will continue to provide free communit
- Licensed under the GNU General Public License (GPL) v3.
- Copyright (c) 2015-present, xuxueli.
产品开源免费,并且将持续提供免费的社区技术支持。个人或企业内部可自由的接入和使用。
产品开源免费,并且将持续提供免费的社区技术支持。个人或企业内部可自由的接入和使用。如有需要可邮件联系作者免费获取项目授权。
## Donate

@ -538,6 +538,77 @@ XXL-JOB是一个分布式任务调度平台其核心设计目标是开发迅
- 444、上海金仕达软件科技有限公司
- 445、深圳易世通达科技有限公司
- 446、爱动超越人工智能科技北京有限责任公司
- 447、迪普信北京科技有限公司
- 448、掌站科技北京有限公司
- 449、深圳市华云中盛股份有限公司
- 450、上海原圈科技有限公司
- 451、广州赞赏信息科技有限公司
- 452、Amber Group
- 453、德威国际货运代理上海公司
- 454、浙江杰夫兄弟智慧科技有限公司
- 455、信也科技
- 456、开思时代科技深圳有限公司
- 457、大连槐德科技有限公司
- 458、同程生活
- 459、松果出行
- 460、企鹅杏仁集团
- 461、宁波科云信息科技有限公司
- 462、上海格蓝威驰信息科技有限公司
- 463、杭州趣淘鲸科技有限公司
- 464、湖州市数字惠民科技有限公司
- 465、乐普北京医疗器械股份有限公司
- 466、广州市晴川高新技术开发有限公司
- 467、山西缇客科技有限公司
- 468、徐州卡西穆电子商务有限公司
- 469、格创东智科技有限公司
- 470、世纪龙信息网络有限责任公司
- 471、邦道科技有限公司
- 472、河南中盟新云科技股份有限公司
- 473、横琴人寿保险有限公司
- 474、上海海隆华钟信息技术有限公司
- 475、上海久湛
- 476、上海仙豆智能机器人有限公司
- 477、广州汇尚网络科技有限公司
- 478、深圳市阿卡索资讯股份有限公司
- 479、青岛佳家康健康管理有限责任公司
- 480、蓝城兄弟
- 481、成都天府通金融服务股份有限公司
- 482、深圳云镖网络科技有限公司
- 483、上海影创科技
- 484、成都艾拉物联
- 485、北京客邻尚品网络技术有限公司
- 486、IT实战联盟
- 487、杭州尤拉夫科技有限公司
- 488、中大检测(湖南)股份有限公司
- 489、江苏电老虎工业互联网股份有限公司
- 490、上海助通信息科技有限公司
- 491、北京符节科技有限公司
- 492、杭州英祐科技有限公司
- 493、江苏电老虎工业互联网股份有限公司
- 494、深圳市点猫科技有限公司
- 495、杭州天音
- 496、深圳市二十一科技互联网有限公司
- 497、海南海口翎度科技
- 498、北京小趣智品科技有限公司
- 499、广州石竹计算机软件有限公司
- 500、深圳市惟客数据科技有限公司
- 501、中国医疗器械有限公司
- 502、上海云谦科技有限公司
- 503、上海磐农信息科技有限公司
- 504、广州领航食品有限公司
- 505、青岛掌讯通区块链科技有限公司
- 506、北京新网数码信息技术有限公司
- 507、超体信息科技(深圳)有限公司
- 508、长沙店帮手信息科技有限公司
- 509、上海助弓装饰工程有限公司
- 510、杭州寻联网络科技有限公司
- 511、成都大淘客科技有限公司
- 512、松果出行
- 513、深圳市唤梦科技有限公司
- 514、上汽集团商用车技术中心
- 515、北京中航讯科技股份有限公司
- 516、北龙中网(北京)科技有限责任公司
- 517、前海超级前台(深圳)信息技术有限公司
- ……
> 更多接入的公司,欢迎在 [登记地址](https://github.com/xuxueli/xxl-job/issues/1 ) 登记,登记仅仅为了产品推广。
@ -2136,19 +2207,24 @@ public void execute() {
- 26、【修复】页面redirect跳转后https变为http问题修复
- 27、【修复】执行器日志清理优化修复小概率下日志文件为空导致清理异常问题
### 7.32 版本 v2.4.0 Release Notes[规划中]
### 7.32 版本 v2.3.1 Release Notes[规划中]
- 1、【优化】[规划中]任务日志重构:一次调度只记录一条主任务,维护起止时间和状态。
- 普通任务:只记录一条主任务;
- 广播任务:记录一条主任务,每个分片任务记录一条次任务,关联在主任务上;
- 重试任务:失败时,新增主任务。所有调度记录,包括入口调度和重试调度,均挂载主任务上。
- 2、【优化】[规划中]分片任务:全部完成后才会出发后置节点;
- 3、[规划中]DAG流程任务
- 3、【优化】多个项目依赖升级至较新稳定版本如netty、groovy、spring、springboot等
- 4、【优化】合并多项PR代码结构、代码变量和注释等多项优化调整
- 5、【优化】任务线程名优化(ISSUE-2527)
- 6、【修复】邮箱校验逻辑下放至EmailJobAlarm中避免对其他告警方式的干扰
### 7.33 版本 v2.4.0 Release Notes[规划中]
- 1、[规划中]DAG流程任务
- DAG任务支持参数传递共享数据DAG任务创建、管理DAG任务日志查看、操作
- 子任务:废弃
- 4、[规划中]多数据库支持DAO层通过JPA实现不限制数据库类型
- 5、[规划中]告警增强:邮件告警 + webhook告警
- 6、[规划中]安全强化AccessToken动态生成、动态启停控制调度、回调
- 2、[规划中]多数据库支持DAO层通过JPA实现不限制数据库类型
- 3、[规划中]告警增强:邮件告警 + webhook告警
- 4、[规划中]安全强化AccessToken动态生成、动态启停控制调度、回调
### TODO LIST
- 1、任务分片路由分片采用一致性Hash算法计算出尽量稳定的分片顺序即使注册机器存在波动也不会引起分批分片顺序大的波动目前采用IP自然排序可以满足需求待定
@ -2193,7 +2269,7 @@ public void execute() {
更多接入的公司,欢迎在 [登记地址](https://github.com/xuxueli/xxl-job/issues/1 ) 登记,登记仅仅为了产品推广。
### 8.3 开源协议和版权
产品开源免费,并且将持续提供免费的社区技术支持。个人或企业内部可自由的接入和使用。
产品开源免费,并且将持续提供免费的社区技术支持。个人或企业内部可自由的接入和使用。如有需要可邮件联系作者免费获取项目授权。
- Licensed under the GNU General Public License (GPL) v3.
- Copyright (c) 2015-present, xuxueli.

@ -1,5 +1,5 @@
#
# XXL-JOB v2.3.0
# XXL-JOB v2.3.1-SNAPSHOT
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
@ -21,6 +21,7 @@ CREATE TABLE `xxl_job_info` (
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`sharding_param` text DEFAULT NULL COMMENT '执行器分片参数',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId>
<version>2.3.0</version>
<version>2.3.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
@ -24,20 +24,20 @@
<maven.compiler.target>1.8</maven.compiler.target>
<maven.test.skip>true</maven.test.skip>
<netty-all.version>4.1.58.Final</netty-all.version>
<netty-all.version>4.1.63.Final</netty-all.version>
<gson.version>2.8.6</gson.version>
<spring.version>5.3.3</spring.version>
<spring-boot.version>2.4.2</spring-boot.version>
<spring.version>5.3.6</spring.version>
<spring-boot.version>2.4.5</spring-boot.version>
<mybatis-spring-boot-starter.version>2.1.4</mybatis-spring-boot-starter.version>
<mysql-connector-java.version>8.0.23</mysql-connector-java.version>
<mysql-connector-java.version>8.0.24</mysql-connector-java.version>
<slf4j-api.version>1.7.30</slf4j-api.version>
<junit.version>5.7.1</junit.version>
<junit-jupiter.version>5.7.1</junit-jupiter.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<groovy.version>3.0.7</groovy.version>
<groovy.version>3.0.8</groovy.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.2.0</maven-javadoc-plugin.version>

@ -4,7 +4,7 @@
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId>
<version>2.3.0</version>
<version>2.3.1-SNAPSHOT</version>
</parent>
<artifactId>xxl-job-admin</artifactId>
<packaging>jar</packaging>

@ -32,6 +32,7 @@ public class EmailJobAlarm implements JobAlarm {
*
* @param jobLog
*/
@Override
public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){
boolean alarmResult = true;

@ -47,7 +47,7 @@ public class XxlJobCompleter {
// 1、handle success, to trigger child job
String triggerChildMsg = null;
if (XxlJobContext.HANDLE_COCE_SUCCESS == xxlJobLog.getHandleCode()) {
if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";

@ -27,6 +27,7 @@ public class XxlJobInfo {
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器任务Handler名称
private String executorParam; // 执行器,任务参数
private String shardingParam; // 执行器,分片参数
private String executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
@ -42,7 +43,6 @@ public class XxlJobInfo {
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
public int getId() {
return id;
}
@ -147,6 +147,14 @@ public class XxlJobInfo {
this.executorParam = executorParam;
}
public String getShardingParam() {
return shardingParam;
}
public void setShardingParam(String shardingParam) {
this.shardingParam = shardingParam;
}
public String getExecutorBlockStrategy() {
return executorBlockStrategy;
}

@ -17,7 +17,7 @@ public enum ExecutorRouteStrategyEnum {
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), new ExecutorRouteSharding());
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;

@ -0,0 +1,23 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.List;
/**
* <p>
* --
* </p>
*
* @author daiqi
* @since 2022/5/8 18:23
*/
public class ExecutorRouteSharding extends ExecutorRouteBusyover {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return super.route(triggerParam, addressList);
}
}

@ -60,7 +60,7 @@ public class JobFailMonitorHelper {
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {

@ -13,10 +13,12 @@ import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.ThrowableUtil;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
/**
* xxl-job trigger
@ -79,8 +81,9 @@ public class XxlJobTrigger {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
int shardingTotal = getSharingTotal(jobInfo.getShardingParam());
for (int i = 0; i < shardingTotal; i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, shardingTotal);
}
} else {
if (shardingParam == null) {
@ -91,6 +94,35 @@ public class XxlJobTrigger {
}
private static int getSharingTotal(String executorShardingParam) {
if (StringUtil.isNullOrEmpty(executorShardingParam)) {
return 1;
}
// 获取分片的数量
return getShardingArr(executorShardingParam).length;
}
public static String getShardingParam(String shardingParams, int index) {
String[] shardingArr = getShardingArr(shardingParams);
String[] shardingParamArr = shardingArr[index].split("=");
if (shardingParamArr.length >= 2) {
return shardingParamArr[1];
}
return shardingParamArr[0];
}
private static String [] getShardingArr(String shardingParams) {
if (StringUtil.isNullOrEmpty(shardingParams)) {
return new String[0];
}
return shardingParams.split("/");
}
public static class ShardingInfo {
private int total;
private List<String> shardingParams;
}
private static boolean isNumeric(String str){
try {
int result = Integer.valueOf(str);
@ -113,7 +145,7 @@ public class XxlJobTrigger {
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?getShardingParam(jobInfo.getShardingParam(), index):null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
@ -128,6 +160,7 @@ public class XxlJobTrigger {
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setShardingParam(shardingParam);
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
@ -142,18 +175,10 @@ public class XxlJobTrigger {
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}

@ -274,6 +274,7 @@ public class XxlJobServiceImpl implements XxlJobService {
exists_jobInfo.setExecutorRouteStrategy(jobInfo.getExecutorRouteStrategy());
exists_jobInfo.setExecutorHandler(jobInfo.getExecutorHandler());
exists_jobInfo.setExecutorParam(jobInfo.getExecutorParam());
exists_jobInfo.setShardingParam(jobInfo.getShardingParam());
exists_jobInfo.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
exists_jobInfo.setExecutorTimeout(jobInfo.getExecutorTimeout());
exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount());

@ -25,7 +25,7 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool

@ -1,6 +1,6 @@
admin_name=Scheduling Center
admin_name_full=Distributed Task Scheduling Platform XXL-JOB
admin_version=2.3.0
admin_version=2.3.1-SNAPSHOT
admin_i18n=en
## system
@ -117,6 +117,7 @@ jobinfo_field_jobdesc=Job description
jobinfo_field_timeout=Job timeout period
jobinfo_field_gluetype=GLUE Type
jobinfo_field_executorparam=Param
jobinfo_field_shardingparam=Sharding Param
jobinfo_field_author=Author
jobinfo_field_alarmemail=Alarm email
jobinfo_field_alarmemail_placeholder=Please enter alarm mail, if there are more than one comma separated
@ -221,6 +222,7 @@ jobgroup_empty=There is no valid executor. Please contact the administrator
## job conf
jobconf_block_SERIAL_EXECUTION=Serial execution
jobconf_block_CONCURRENT_EXECUTION=Concurrent execution
jobconf_block_DISCARD_LATER=Discard Later
jobconf_block_COVER_EARLY=Cover Early
jobconf_route_first=First

@ -1,6 +1,6 @@
admin_name=任务调度中心
admin_name_full=分布式任务调度平台XXL-JOB
admin_version=2.3.0
admin_version=2.3.1-SNAPSHOT
admin_i18n=
## system
@ -116,6 +116,7 @@ jobinfo_field_jobgroup=执行器
jobinfo_field_jobdesc=任务描述
jobinfo_field_gluetype=运行模式
jobinfo_field_executorparam=任务参数
jobinfo_field_shardingparam=任务分片参数
jobinfo_field_author=负责人
jobinfo_field_timeout=任务超时时间
jobinfo_field_alarmemail=报警邮件
@ -221,6 +222,7 @@ jobgroup_empty=不存在有效执行器,请联系管理员
## job conf
jobconf_block_SERIAL_EXECUTION=单机串行
jobconf_block_CONCURRENT_EXECUTION=单机并行
jobconf_block_DISCARD_LATER=丢弃后续调度
jobconf_block_COVER_EARLY=覆盖之前调度
jobconf_route_first=第一个

@ -1,6 +1,6 @@
admin_name=任務調度中心
admin_name_full=分布式任務調度平臺XXL-JOB
admin_version=2.3.0
admin_version=2.3.1-SNAPSHOT
admin_i18n=
## system
@ -116,6 +116,7 @@ jobinfo_field_jobgroup=執行器
jobinfo_field_jobdesc=任務描述
jobinfo_field_gluetype=運行模式
jobinfo_field_executorparam=任務參數
jobinfo_field_shardingparam=分片參數
jobinfo_field_author=負責人
jobinfo_field_timeout=任務超時秒數
jobinfo_field_alarmemail=告警郵件
@ -221,6 +222,7 @@ jobgroup_empty=不存在有效執行器,請聯絡系統管理員
## job conf
jobconf_block_SERIAL_EXECUTION=單機串行
jobconf_block_CONCURRENT_EXECUTION=单机并行
jobconf_block_DISCARD_LATER=丢棄后續調度
jobconf_block_COVER_EARLY=覆蓋之前調度
jobconf_route_first=第一個

@ -22,6 +22,7 @@
<result column="executor_route_strategy" property="executorRouteStrategy" />
<result column="executor_handler" property="executorHandler" />
<result column="executor_param" property="executorParam" />
<result column="sharding_param" property="shardingParam" />
<result column="executor_block_strategy" property="executorBlockStrategy" />
<result column="executor_timeout" property="executorTimeout" />
<result column="executor_fail_retry_count" property="executorFailRetryCount" />
@ -52,6 +53,7 @@
t.executor_route_strategy,
t.executor_handler,
t.executor_param,
t.sharding_param,
t.executor_block_strategy,
t.executor_timeout,
t.executor_fail_retry_count,
@ -125,6 +127,7 @@
executor_route_strategy,
executor_handler,
executor_param,
sharding_param,
executor_block_strategy,
executor_timeout,
executor_fail_retry_count,
@ -149,6 +152,7 @@
#{executorRouteStrategy},
#{executorHandler},
#{executorParam},
#{shardingParam},
#{executorBlockStrategy},
#{executorTimeout},
#{executorFailRetryCount},
@ -187,6 +191,7 @@
executor_route_strategy = #{executorRouteStrategy},
executor_handler = #{executorHandler},
executor_param = #{executorParam},
sharding_param = #{shardingParam},
executor_block_strategy = #{executorBlockStrategy},
executor_timeout = ${executorTimeout},
executor_fail_retry_count = ${executorFailRetryCount},

@ -74,6 +74,7 @@ $(function() {
}
},
{ "data": 'executorParam', "visible" : false},
{ "data": 'shardingParam', "visible" : false},
{
"data": 'addTime',
"visible" : false,
@ -266,6 +267,7 @@ $(function() {
$("#jobTriggerModal .form input[name='id']").val( row.id );
$("#jobTriggerModal .form textarea[name='executorParam']").val( row.executorParam );
$("#jobTriggerModal .form textarea[name='shardingParam']").val( row.shardingParam );
$('#jobTriggerModal').modal({backdrop: false, keyboard: false}).modal('show');
});
@ -276,6 +278,7 @@ $(function() {
data : {
"id" : $("#jobTriggerModal .form input[name='id']").val(),
"executorParam" : $("#jobTriggerModal .textarea[name='executorParam']").val(),
"shardingParam" : $("#jobTriggerModal .textarea[name='shardingParam']").val(),
"addressList" : $("#jobTriggerModal .textarea[name='addressList']").val()
},
dataType : "json",
@ -561,6 +564,7 @@ $(function() {
$('#updateModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true);
$("#updateModal .form input[name='executorHandler']").val( row.executorHandler );
$("#updateModal .form textarea[name='executorParam']").val( row.executorParam );
$("#updateModal .form textarea[name='shardingParam']").val( row.shardingParam );
// 》init glueType
$("#updateModal .form select[name=glueType]").change();
@ -716,6 +720,7 @@ $(function() {
$('#addModal .form select[name=glueType] option[value='+ row.glueType +']').prop('selected', true);
$("#addModal .form input[name='executorHandler']").val( row.executorHandler );
$("#addModal .form textarea[name='executorParam']").val( row.executorParam );
$("#addModal .form textarea[name='shardingParam']").val( row.shardingParam );
// 》init glueType
$("#addModal .form select[name=glueType]").change();

@ -83,6 +83,7 @@
<th name="scheduleType" >${I18n.schedule_type}</th>
<th name="glueType" >${I18n.jobinfo_field_gluetype}</th>
<th name="executorParam" >${I18n.jobinfo_field_executorparam}</th>
<th name="shardingParam" >${I18n.jobinfo_field_shardingparam}</th>
<th name="addTime" >addTime</th>
<th name="updateTime" >updateTime</th>
<th name="author" >${I18n.jobinfo_field_author}</th>
@ -188,6 +189,13 @@
</div>
</div>
<div class="form-group">
<label for="firstname" class="col-sm-2 control-label">${I18n.jobinfo_field_shardingparam}<font color="black">*</font></label>
<div class="col-sm-10">
<textarea class="textarea form-control" name="shardingParam" placeholder="${I18n.system_please_input}${I18n.jobinfo_field_shardingparam}" maxlength="512" style="height: 63px; line-height: 1.2;"></textarea>
</div>
</div>
<br>
<p style="margin: 0 0 10px;text-align: left;border-bottom: 1px solid #e5e5e5;color: gray;">${I18n.jobinfo_conf_advanced}</p> <#-- -->
@ -265,6 +273,7 @@ echo "${I18n.jobinfo_script_location}$0"
echo "${I18n.jobinfo_field_executorparam}$1"
echo "${I18n.jobinfo_shard_index} = $2"
echo "${I18n.jobinfo_shard_total} = $3"
echo "${I18n.jobinfo_field_shardingparam}$4"
<#--echo "参数数量:$#"
for param in $*
do
@ -287,6 +296,7 @@ print "${I18n.jobinfo_script_location}", sys.argv[0]
print "${I18n.jobinfo_field_executorparam}", sys.argv[1]
print "${I18n.jobinfo_shard_index}", sys.argv[2]
print "${I18n.jobinfo_shard_total}", sys.argv[3]
print "${I18n.jobinfo_field_shardingparam}", sys.argv[4]
<#--for i in range(1, len(sys.argv)):
time.sleep(1)
print "参数", i, sys.argv[i]-->
@ -309,6 +319,7 @@ logging.info("脚本文件:" + sys.argv[0])
echo "${I18n.jobinfo_field_executorparam}$argv[1] \n";
echo "${I18n.jobinfo_shard_index} = $argv[2] \n";
echo "${I18n.jobinfo_shard_total} = $argv[3] \n";
echo "${I18n.jobinfo_field_shardingparam} = $argv[4] \n";
echo "Good bye! \n";
exit(0);
@ -433,6 +444,13 @@ exit 0
</div>
</div>
<div class="form-group">
<label for="firstname" class="col-sm-2 control-label">${I18n.jobinfo_field_shardingparam}<font color="black">*</font></label>
<div class="col-sm-10">
<textarea class="textarea form-control" name="shardingParam" placeholder="${I18n.system_please_input}${I18n.jobinfo_field_shardingparam}" maxlength="512" style="height: 63px; line-height: 1.2;"></textarea>
</div>
</div>
<br>
<p style="margin: 0 0 10px;text-align: left;border-bottom: 1px solid #e5e5e5;color: gray;">${I18n.jobinfo_conf_advanced}</p> <#-- -->
@ -507,6 +525,12 @@ exit 0
<textarea class="textarea form-control" name="executorParam" placeholder="${I18n.system_please_input}${I18n.jobinfo_field_executorparam}" maxlength="512" style="height: 63px; line-height: 1.2;"></textarea>
</div>
</div>
<div class="form-group">
<label for="firstname" class="col-sm-2 control-label">${I18n.jobinfo_field_shardingparam}<font color="black">*</font></label>
<div class="col-sm-10">
<textarea class="textarea form-control" name="shardingParam" placeholder="${I18n.system_please_input}${I18n.jobinfo_field_shardingparam}" maxlength="512" style="height: 63px; line-height: 1.2;"></textarea>
</div>
</div>
<div class="form-group">
<label for="firstname" class="col-sm-2 control-label">${I18n.jobgroup_field_registryList}<font color="black">*</font></label>
<div class="col-sm-10">

@ -32,7 +32,7 @@ public class AdminBizTest {
HandleCallbackParam param = new HandleCallbackParam();
param.setLogId(1);
param.setHandleCode(XxlJobContext.HANDLE_COCE_SUCCESS);
param.setHandleCode(XxlJobContext.HANDLE_CODE_SUCCESS);
List<HandleCallbackParam> callbackParamList = Arrays.asList(param);

@ -4,7 +4,7 @@
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId>
<version>2.3.0</version>
<version>2.3.1-SNAPSHOT</version>
</parent>
<artifactId>xxl-job-core</artifactId>
<packaging>jar</packaging>

@ -12,6 +12,7 @@ public class TriggerParam implements Serializable{
private String executorHandler;
private String executorParams;
private String shardingParam;
private String executorBlockStrategy;
private int executorTimeout;
@ -46,6 +47,14 @@ public class TriggerParam implements Serializable{
return executorParams;
}
public String getShardingParam() {
return shardingParam;
}
public void setShardingParam(String shardingParam) {
this.shardingParam = shardingParam;
}
public void setExecutorParams(String executorParams) {
this.executorParams = executorParams;
}

@ -8,9 +8,9 @@ package com.xxl.job.core.context;
*/
public class XxlJobContext {
public static final int HANDLE_COCE_SUCCESS = 200;
public static final int HANDLE_COCE_FAIL = 500;
public static final int HANDLE_COCE_TIMEOUT = 502;
public static final int HANDLE_CODE_SUCCESS = 200;
public static final int HANDLE_CODE_FAIL = 500;
public static final int HANDLE_CODE_TIMEOUT = 502;
// ---------------------- base info ----------------------
@ -23,6 +23,10 @@ public class XxlJobContext {
* job param
*/
private final String jobParam;
/**
* job param
*/
private final String jobShardingParam;
// ---------------------- for log ----------------------
@ -64,11 +68,22 @@ public class XxlJobContext {
public XxlJobContext(long jobId, String jobParam, String jobLogFileName, int shardIndex, int shardTotal) {
this.jobId = jobId;
this.jobParam = jobParam;
this.jobShardingParam = null;
this.jobLogFileName = jobLogFileName;
this.shardIndex = shardIndex;
this.shardTotal = shardTotal;
this.handleCode = HANDLE_COCE_SUCCESS; // default success
this.handleCode = HANDLE_CODE_SUCCESS; // default success
}
public XxlJobContext(long jobId, String jobParam, String jobShardingParam, String jobLogFileName, int shardIndex, int shardTotal) {
this.jobId = jobId;
this.jobParam = jobParam;
this.jobShardingParam = jobShardingParam;
this.jobLogFileName = jobLogFileName;
this.shardIndex = shardIndex;
this.shardTotal = shardTotal;
this.handleCode = HANDLE_CODE_SUCCESS; // default success
}
public long getJobId() {
@ -79,6 +94,10 @@ public class XxlJobContext {
return jobParam;
}
public String getJobShardingParam() {
return jobShardingParam;
}
public String getJobLogFileName() {
return jobLogFileName;
}

@ -48,6 +48,20 @@ public class XxlJobHelper {
return xxlJobContext.getJobParam();
}
/**
* current JobParam
*
* @return
*/
public static String getJobShardingParam() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
if (xxlJobContext == null) {
return null;
}
return xxlJobContext.getJobShardingParam();
}
// ---------------------- for log ----------------------
/**
@ -177,7 +191,7 @@ public class XxlJobHelper {
* @return
*/
public static boolean handleSuccess(){
return handleResult(XxlJobContext.HANDLE_COCE_SUCCESS, null);
return handleResult(XxlJobContext.HANDLE_CODE_SUCCESS, null);
}
/**
@ -187,7 +201,7 @@ public class XxlJobHelper {
* @return
*/
public static boolean handleSuccess(String handleMsg) {
return handleResult(XxlJobContext.HANDLE_COCE_SUCCESS, handleMsg);
return handleResult(XxlJobContext.HANDLE_CODE_SUCCESS, handleMsg);
}
/**
@ -196,7 +210,7 @@ public class XxlJobHelper {
* @return
*/
public static boolean handleFail(){
return handleResult(XxlJobContext.HANDLE_COCE_FAIL, null);
return handleResult(XxlJobContext.HANDLE_CODE_FAIL, null);
}
/**
@ -206,7 +220,7 @@ public class XxlJobHelper {
* @return
*/
public static boolean handleFail(String handleMsg) {
return handleResult(XxlJobContext.HANDLE_COCE_FAIL, handleMsg);
return handleResult(XxlJobContext.HANDLE_CODE_FAIL, handleMsg);
}
/**
@ -215,7 +229,7 @@ public class XxlJobHelper {
* @return
*/
public static boolean handleTimeout(){
return handleResult(XxlJobContext.HANDLE_COCE_TIMEOUT, null);
return handleResult(XxlJobContext.HANDLE_CODE_TIMEOUT, null);
}
/**
@ -225,7 +239,7 @@ public class XxlJobHelper {
* @return
*/
public static boolean handleTimeout(String handleMsg){
return handleResult(XxlJobContext.HANDLE_COCE_TIMEOUT, handleMsg);
return handleResult(XxlJobContext.HANDLE_CODE_TIMEOUT, handleMsg);
}
/**

@ -6,7 +6,7 @@ package com.xxl.job.core.enums;
public enum ExecutorBlockStrategyEnum {
SERIAL_EXECUTION("Serial execution"),
/*CONCURRENT_EXECUTION("并行"),*/
CONCURRENT_EXECUTION("Concurrent execution"),
DISCARD_LATER("Discard Later"),
COVER_EARLY("Cover Early");

@ -3,6 +3,8 @@ package com.xxl.job.core.executor;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.handler.impl.MethodJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
@ -13,6 +15,7 @@ import com.xxl.job.core.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -81,10 +84,10 @@ public class XxlJobExecutor {
initEmbedServer(address, ip, port, appname, accessToken);
}
public void destroy(){
// destory executor-server
// destroy executor-server
stopEmbedServer();
// destory jobThreadRepository
// destroy jobThreadRepository
if (jobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
@ -102,10 +105,10 @@ public class XxlJobExecutor {
jobHandlerRepository.clear();
// destory JobLogFileCleanThread
// destroy JobLogFileCleanThread
JobLogFileCleanThread.getInstance().toStop();
// destory TriggerCallbackThread
// destroy TriggerCallbackThread
TriggerCallbackThread.getInstance().toStop();
}
@ -178,6 +181,59 @@ public class XxlJobExecutor {
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
if (xxlJob == null) {
return;
}
String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// execute method
/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/
executeMethod.setAccessible(true);
// init and destroy
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
// ---------------------- job thread repository ----------------------

@ -30,6 +30,7 @@ public class XxlJobSimpleExecutor extends XxlJobExecutor {
}
@Override
public void start() {
// init JobHandler Repository (for method)
@ -43,6 +44,7 @@ public class XxlJobSimpleExecutor extends XxlJobExecutor {
}
}
@Override
public void destroy() {
super.destroy();
}
@ -57,62 +59,13 @@ public class XxlJobSimpleExecutor extends XxlJobExecutor {
for (Object bean: xxlJobBeanList) {
// method
Method[] methods = bean.getClass().getDeclaredMethods();
if (methods==null || methods.length==0) {
if (methods.length == 0) {
continue;
}
for (Method executeMethod : methods) {
// anno
XxlJob xxlJob = executeMethod.getAnnotation(XxlJob.class);
if (xxlJob == null) {
continue;
}
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// execute method
/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/
executeMethod.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
// registry
registJobHandler(xxlJob, bean, executeMethod);
}
}

@ -105,56 +105,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
// regist
registJobHandler(xxlJob, bean, executeMethod);
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// execute method
/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/
executeMethod.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
}
}
// ---------------------- applicationContext ----------------------
@ -162,7 +116,7 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
XxlJobSpringExecutor.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {

@ -52,7 +52,7 @@ public class EmbedServer {
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {

@ -102,7 +102,7 @@ public class ExecutorRegistryThread {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
}
});

@ -96,7 +96,7 @@ public class JobLogFileCleanThread {
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");
logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");
}
});

@ -5,6 +5,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
@ -44,6 +45,9 @@ public class JobThread extends Thread{
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
// assign job thread name
this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());
}
public IJobHandler getHandler() {
return handler;
@ -100,6 +104,7 @@ public class JobThread extends Thread{
logger.error(e.getMessage(), e);
}
ExecutorService executor2 = Executors.newFixedThreadPool(5);
// execute
while(!toStop){
running = false;
@ -119,6 +124,7 @@ public class JobThread extends Thread{
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
triggerParam.getShardingParam(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
@ -127,7 +133,7 @@ public class JobThread extends Thread{
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()+ "Sharding Param:" + xxlJobContext.getJobShardingParam());
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
@ -158,6 +164,19 @@ public class JobThread extends Thread{
} finally {
futureThread.interrupt();
}
} else if (triggerParam.getExecutorBlockStrategy().equals(ExecutorBlockStrategyEnum.CONCURRENT_EXECUTION.name())){
executor2.execute(new Runnable() {
@Override
public void run() {
try {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
// just execute
handler.execute();
@ -215,13 +234,14 @@ public class JobThread extends Thread{
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_COCE_FAIL,
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
executor2.shutdown();
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
@ -231,7 +251,7 @@ public class JobThread extends Thread{
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_COCE_FAIL,
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}

@ -95,7 +95,7 @@ public class TriggerCallbackThread {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
}
});
@ -125,7 +125,7 @@ public class TriggerCallbackThread {
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
}
});
triggerRetryCallbackThread.setDaemon(true);
@ -191,6 +191,7 @@ public class TriggerCallbackThread {
XxlJobContext.setXxlJobContext(new XxlJobContext(
-1,
null,
null,
logFileName,
-1,
-1));

@ -53,7 +53,7 @@ public class NetUtil {
serverSocket = new ServerSocket(port);
used = false;
} catch (IOException e) {
logger.info(">>>>>>>>>>> xxl-rpc, port[{}] is in use.", port);
logger.info(">>>>>>>>>>> xxl-job, port[{}] is in use.", port);
used = true;
} finally {
if (serverSocket != null) {

@ -34,17 +34,21 @@ public class XxlJobRemotingUtil {
logger.error(e.getMessage(), e);
}
connection.setHostnameVerifier(new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
});
}
private static final TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {
@Override
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return new java.security.cert.X509Certificate[]{};
}
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
}};
@ -114,7 +118,7 @@ public class XxlJobRemotingUtil {
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
}
// result
@ -131,13 +135,13 @@ public class XxlJobRemotingUtil {
ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
return returnT;
} catch (Exception e) {
logger.error("xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").", e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").");
logger.error("xxl-job remoting (url="+url+") response content invalid("+ resultJson +").", e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting (url="+url+") response content invalid("+ resultJson +").");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting error("+ e.getMessage() +"), for url : " + url);
} finally {
try {
if (bufferedReader != null) {

@ -5,7 +5,7 @@
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job</artifactId>
<version>2.3.0</version>
<version>2.3.1-SNAPSHOT</version>
</parent>
<artifactId>xxl-job-executor-samples</artifactId>
<packaging>pom</packaging>

@ -6,7 +6,7 @@
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-executor-samples</artifactId>
<version>2.3.0</version>
<version>2.3.1-SNAPSHOT</version>
</parent>
<artifactId>xxl-job-executor-sample-frameless</artifactId>
<packaging>jar</packaging>
@ -28,7 +28,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>

@ -29,8 +29,8 @@ public class FramelessApplication {
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// destory
FrameLessXxlJobConfig.getInstance().destoryXxlJobExecutor();
// destroy
FrameLessXxlJobConfig.getInstance().destroyXxlJobExecutor();
}
}

@ -56,9 +56,9 @@ public class FrameLessXxlJobConfig {
}
/**
* destory
* destroy
*/
public void destoryXxlJobExecutor() {
public void destroyXxlJobExecutor() {
if (xxlJobExecutor != null) {
xxlJobExecutor.destroy();
}

@ -244,7 +244,7 @@ public class SampleXxlJob {
logger.info("init");
}
public void destroy(){
logger.info("destory");
logger.info("destroy");
}

@ -6,7 +6,7 @@
<parent>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-executor-samples</artifactId>
<version>2.3.0</version>
<version>2.3.1-SNAPSHOT</version>
</parent>
<artifactId>xxl-job-executor-sample-springboot</artifactId>
<packaging>jar</packaging>

@ -1,5 +1,6 @@
package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
@ -37,10 +38,14 @@ public class SampleXxlJob {
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobContext jobContext = XxlJobContext.getXxlJobContext();
logger.info("执行分片:{} 分片总数:{},执行参数为:{},执行分片参数为:{}",
jobContext.getShardIndex(), jobContext.getShardTotal(),
jobContext.getJobParam(), jobContext.getJobShardingParam());
for (int i = 0; i < 1; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
logger.info("beat at:" + i);
TimeUnit.SECONDS.sleep(10);
}
// default success
}
@ -246,7 +251,7 @@ public class SampleXxlJob {
logger.info("init");
}
public void destroy(){
logger.info("destory");
logger.info("destroy");
}

Loading…
Cancel
Save