Pre Merge pull request !56 from lixiaoranxuhuaiyun/master

pull/56/MERGE
lixiaoranxuhuaiyun 2 years ago committed by Gitee
commit 1018756f9b
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

@ -1,5 +1,5 @@
#
# XXL-JOB v2.4.1-SNAPSHOT
# XXL-JOB v2.3.1-SNAPSHOT
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
@ -29,10 +29,14 @@ CREATE TABLE `xxl_job_info` (
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID多个逗号分隔',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态0-停止1-运行',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态0-停止1-运行 2.预备执行中',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
PRIMARY KEY (`id`)
`host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名',
`lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态1-改任务锁定中 0-未锁定',
PRIMARY KEY (`id`),
KEY `idx_host_name` (`host_name`),
KEY `idx_lock` (`lock_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log` (
@ -113,10 +117,17 @@ CREATE TABLE `xxl_job_lock` (
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_cluster` (
`host_name` varchar(100) NOT NULL COMMENT '实例名',
`update_time` datetime DEFAULT NULL comment '更新时间',
PRIMARY KEY (`host_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock');
commit;

@ -0,0 +1,4 @@
源码仓库地址
https://github.com/xuxueli/xxl-job/releases
http://gitee.com/xuxueli0323/xxl-job/releases

@ -28,6 +28,7 @@ public class XxlJobCompleter {
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
// finish
//执行字任务
finishJob(xxlJobLog);
// text最大64kb 避免长度过长
@ -36,6 +37,7 @@ public class XxlJobCompleter {
}
// fresh handle
//更新log表
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}

@ -1,8 +1,15 @@
package com.xxl.job.admin.core.conf;
import com.xxl.job.admin.core.alarm.JobAlarmer;
import com.xxl.job.admin.core.model.XxlJobCluster;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.dao.*;
import com.xxl.job.admin.service.JobAllocation;
import com.xxl.job.admin.service.impl.AverageJobAllocation;
import com.xxl.job.core.util.IpUtil;
import io.micrometer.core.instrument.util.StringUtils;
import io.netty.util.internal.StringUtil;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
@ -12,6 +19,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Arrays;
import java.util.List;
/**
* xxl-job config
@ -23,6 +31,7 @@ import java.util.Arrays;
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
@ -35,9 +44,10 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
getJobAllocation().init(true);
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
getJobAllocation().flush();
}
@Override
@ -67,6 +77,17 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Value("${xxl.job.logretentiondays}")
private int logretentiondays;
@Value("${xxl.job.cluster.host.name}")
private String hostName;
@Value("${server.port}")
private int port;
@Value("${xxl.job.cluster.enable:false}")
private boolean clusterEnable;
// dao, service
@Resource
@ -80,12 +101,28 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Resource
private XxlJobLogReportDao xxlJobLogReportDao;
@Resource
private XxlJobClusterDao xxlJobClusterDao;
@Resource
private JavaMailSender mailSender;
@Resource
private DataSource dataSource;
@Resource
private JobAlarmer jobAlarmer;
public String getHostName() {
return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName;
}
private JobAllocation jobAllocation = defaultJobAllocation;
private static JobAllocation defaultJobAllocation = new JobAllocation() {
@Override
public void allocation(XxlJobInfo jobInfo) {
}
@Override
public void init(boolean init) {
}
};
public String getI18n() {
if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) {
@ -155,4 +192,19 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
return jobAlarmer;
}
public XxlJobClusterDao getXxlJobClusterDao() {
return xxlJobClusterDao;
}
public JobAllocation getJobAllocation() {
return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation;
}
public void setJobAllocation(JobAllocation jobAllocation) {
this.jobAllocation = jobAllocation;
}
public boolean isClusterEnable() {
return clusterEnable;
}
}

@ -0,0 +1,26 @@
package com.xxl.job.admin.core.model;
import java.util.Date;
public class XxlJobCluster {
private String hostName;
private Date updateTime;
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}

@ -41,7 +41,9 @@ public class XxlJobInfo {
private int triggerStatus; // 调度状态0-停止1-运行
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
private String hostName; // 运行实例名称
private int lockStatus;
public int getId() {
return id;
@ -234,4 +236,21 @@ public class XxlJobInfo {
public void setTriggerNextTime(long triggerNextTime) {
this.triggerNextTime = triggerNextTime;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public int getLockStatus() {
return lockStatus;
}
public void setLockStatus(int lockStatus) {
this.lockStatus = lockStatus;
}
}

@ -12,6 +12,7 @@ import java.util.List;
/**
* Created by xuxueli on 17/3/10.
* idleBeat
*/
public class ExecutorRouteBusyover extends ExecutorRouter {

@ -11,6 +11,7 @@ import java.util.List;
/**
* Created by xuxueli on 17/3/10.
* beat SUCCESS
*/
public class ExecutorRouteFailover extends ExecutorRouter {

@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentMap;
* a(*)LFU(Least Frequently Used)使/
* bLRU(Least Recently Used)使
*
* 24 map+1
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLFU extends ExecutorRouter {

@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap;
* aLFU(Least Frequently Used)使/
* b(*)LRU(Least Recently Used)使
*
* LinkedHashMap accessorder=true访
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLRU extends ExecutorRouter {

@ -22,24 +22,33 @@ public class XxlJobScheduler {
public void init() throws Exception {
// init i18n
// TODO 国际化阻塞策略
initI18n();
// admin trigger pool start
// TODO 初始化 fastTriggerPool用于 执行任务步骤四、步骤7 、slowTriggerPool用于 执行慢任务步骤四、步骤7 线程池
//TODO 注意最大线程数 后面会用于 计算一次循环从数据库拉取的任务数
JobTriggerPoolHelper.toStart();
// admin registry monitor run
//TODO 1.初始化registryOrRemoveThreadPool用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
//TODO 步骤4 开启线程 扫描异常任务 重试机制
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
//TODO 初始化 callbackThreadPool用于接收客户端执行状态 线程池 启动 monitorThread 处理长期处于进行中的任务
JobCompleteHelper.getInstance().start();
// admin log report start
//todo 开启 logrThread 统计报表相关 间隔一分钟
//TODO 每次统计都是统计一天的 没做大数据量的限制 过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
//TODO 步骤7 任务的预处理 scheduleThread ringThread
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");

@ -74,7 +74,7 @@ public class JobCompleteHelper {
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败
//TODO 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

@ -43,30 +43,39 @@ public class JobFailMonitorHelper {
for (long failLogId: failLogIds) {
// lock log
//这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题
//TODO 如果在这一步服务挂了 这批数据就会被下一个线程扫描为失败
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
//查询执行记录及对应的任务信息
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
//TODO 校验重试次数
if (log.getExecutorFailRetryCount() > 0) {
//执行任务
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
//更新记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info != null) {
//发送告警 不看
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
//更新告警状态
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}

@ -1,17 +1,23 @@
package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobCluster;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.util.DateUtil;
import com.xxl.job.core.util.IpUtil;
import org.apache.catalina.startup.HostConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* job registry instance
@ -26,9 +32,18 @@ public class JobRegistryHelper {
}
private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "cluster host registry");
}
});
private Thread registryMonitorThread;
private volatile boolean toStop = false;
public void start(){
// for registry or remove
@ -59,10 +74,12 @@ public class JobRegistryHelper {
while (!toStop) {
try {
// auto registry group
//
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
//TODO 查询有没有90秒之前注册过的节点 有则直接删除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
@ -70,9 +87,11 @@ public class JobRegistryHelper {
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
//TODO 查询 90秒内注册过的
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
//TODO
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
@ -103,7 +122,7 @@ public class JobRegistryHelper {
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
//TODO 循环更新 执行器 对应的 注册列表
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
@ -113,6 +132,7 @@ public class JobRegistryHelper {
}
}
try {
//TODO 间隔30秒
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
@ -126,6 +146,29 @@ public class JobRegistryHelper {
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName());
Date date = new Date();
XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5));
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo();
if(jobInfo!=null){
XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false);
XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush();
}
} catch (Exception e) {
logger.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 0, 1000 * 30, TimeUnit.MILLISECONDS);
}
public void toStop(){

@ -6,6 +6,7 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import com.xxl.job.core.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,6 +16,7 @@ import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author xuxueli 2019-05-21
@ -70,6 +72,7 @@ public class JobScheduleHelper {
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//TODO 通过这个来控制多节点只有一个节点会执行 low的一笔
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
@ -77,18 +80,37 @@ public class JobScheduleHelper {
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
//TODO 下次执行时间 在5秒内的 整个框架的核心
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao()
.scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount, XxlJobAdminConfig.getAdminConfig().getHostName(),
XxlJobAdminConfig.getAdminConfig().isClusterEnable());
if (scheduleList!=null && scheduleList.size()>0) {
if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream()
.map(XxlJobInfo::getId).collect(Collectors.toList()));
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
scheduleList.forEach(x->logger.info(x.getId()+""));
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// TODO 当前时间>下次执行时间+5秒 任务触发过期策略 5秒内不算过期
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5spass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
//如果过期策略是立即执行一次则 执行
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
@ -96,25 +118,33 @@ public class JobScheduleHelper {
}
// 2、fresh next
//刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
}
//TODO 当前时间>下次执行时间 过期了但是控制在5秒内不算过期
else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5sdirect-trigger && make next-trigger-time
// 1、trigger
//TODO 执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
//TODO 刷新下次执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
//TODO 如果任务在运行状态 且 下次执行时间在5秒内 则将 任务 缓存到 ringData
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
//TODO 获取下次执行时间 是哪一秒 作为缓存的key
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
//TODO 缓存至 ringData
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
@ -122,7 +152,9 @@ public class JobScheduleHelper {
}
} else {
}
//还没到下次执行时间的 缓存到 ringData
else {
// 2.3、trigger-pre-readtime-ring trigger && make next-trigger-time
// 1、make ring second
@ -140,6 +172,7 @@ public class JobScheduleHelper {
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
jobInfo.setLockStatus(0);
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
@ -199,6 +232,7 @@ public class JobScheduleHelper {
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
//随机休眠 没任务则休眠5秒内 有任务则休眠1秒内
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
@ -236,7 +270,8 @@ public class JobScheduleHelper {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
//TODO 避免处理耗时太长,跨过刻度,向前校验一个刻度;
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
@ -249,7 +284,7 @@ public class JobScheduleHelper {
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
//TODO do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
@ -355,6 +390,14 @@ public class JobScheduleHelper {
// ---------------------- tools ----------------------
/**
*
* @param jobInfo
* @param fromTime
* @return
* @throws Exception
*/
public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {

@ -30,7 +30,7 @@ public class JobTriggerPoolHelper {
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new LinkedBlockingQueue<Runnable>(10000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -43,7 +43,7 @@ public class JobTriggerPoolHelper {
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new LinkedBlockingQueue<Runnable>(20000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -79,6 +79,7 @@ public class JobTriggerPoolHelper {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
//慢任务 超过 10次 则调用单独的线程池
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
@ -97,6 +98,7 @@ public class JobTriggerPoolHelper {
logger.error(e.getMessage(), e);
} finally {
//TODO 统计1分钟内 调用超过500ms的
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {

@ -49,6 +49,7 @@ public class XxlJobTrigger {
String addressList) {
// load data
//###############初始化 任务参数及执行器参数
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalidjobId={}", jobId);
@ -76,6 +77,9 @@ public class XxlJobTrigger {
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
//############################
//TODO 路由策略如果是 分片广播 则循环调用
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
@ -86,6 +90,7 @@ public class XxlJobTrigger {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
//TODO
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
@ -111,7 +116,9 @@ public class XxlJobTrigger {
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
//TODO 获取阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
//TODO 获取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
@ -142,6 +149,7 @@ public class XxlJobTrigger {
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
//分片广播
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
@ -149,6 +157,7 @@ public class XxlJobTrigger {
address = group.getRegistryList().get(0);
}
} else {
//TODO 基于不同的路由策略获取地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
@ -161,6 +170,7 @@ public class XxlJobTrigger {
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
//TODO 执行任务 调用 客户端 run 接口
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
@ -193,6 +203,7 @@ public class XxlJobTrigger {
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
//TODO 更新调用记录
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

@ -0,0 +1,17 @@
package com.xxl.job.admin.dao;
import com.xxl.job.admin.core.model.XxlJobCluster;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
@Mapper
public interface XxlJobClusterDao {
void replace(@Param("ip") String ip);
List<XxlJobCluster> findAll();
void delete(@Param("time") Date date);
}

@ -9,6 +9,7 @@ import java.util.List;
/**
* job info
*
* @author xuxueli 2016-1-12 18:03:45
*/
@Mapper
@ -21,6 +22,7 @@ public interface XxlJobInfoDao {
@Param("jobDesc") String jobDesc,
@Param("executorHandler") String executorHandler,
@Param("author") String author);
public int pageListCount(@Param("offset") int offset,
@Param("pagesize") int pagesize,
@Param("jobGroup") int jobGroup,
@ -41,9 +43,19 @@ public interface XxlJobInfoDao {
public int findAllCount();
public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize );
public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize,
@Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable);
public int scheduleUpdate(XxlJobInfo xxlJobInfo);
void updateStatusById(@Param("list") List<Integer> collect);
List<XxlJobInfo> pageById(@Param("id") Integer id);
void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List<Integer> v);
void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init);
XxlJobInfo findOldClusterInfo();
}

@ -0,0 +1,22 @@
package com.xxl.job.admin.service;
import com.xxl.job.admin.core.model.XxlJobInfo;
public interface JobAllocation {
/**
*
* @param jobInfo
*/
default void allocation(XxlJobInfo jobInfo){}
/**
*
*/
default void flush(){}
/**
*
*/
default void init(boolean init){}
}

@ -0,0 +1,150 @@
package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobCluster;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.service.JobAllocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* @author liyong
*/
public class AverageJobAllocation implements JobAllocation {
private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class);
private static int oneMinute = 60;
private static int tenMinutes = oneMinute * 10;
private static int oneHour = tenMinutes * 6;
private static int oneDay = oneHour * 24;
private Map<Integer, AtomicInteger> averageMap = new ConcurrentHashMap<>();
private Map<String, List<String>> ipMap = new ConcurrentHashMap<>();
{
// 防止执行频率高的 扎堆在一起
averageMap.put(oneMinute, new AtomicInteger(0));
averageMap.put(tenMinutes, new AtomicInteger(0));
averageMap.put(oneHour, new AtomicInteger(0));
averageMap.put(oneDay, new AtomicInteger(0));
averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0));
}
@Override
public void allocation(XxlJobInfo jobInfo) {
List<XxlJobCluster> all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll();
List<String> ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000)
.map(XxlJobCluster::getHostName).collect(Collectors.toList());
if (ipList.isEmpty()) {
jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName());
return;
}
Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis());
int i = averageMap.get(key).incrementAndGet();
jobInfo.setHostName(ipList.get(i % ipList.size()));
}
@Override
public void init(boolean init) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init);
}
@Override
public void flush() {
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update");
preparedStatement.execute();
recursion(0, conn);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job, cluster flush error:{}", e);
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
try {
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
}
private void recursion(int id, Connection conn) {
List<XxlJobInfo> list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id);
if (CollectionUtils.isEmpty(list))
return;
Map<String, List<Integer>> map = new HashMap<>();
list.forEach(x -> {
allocation(x);
List<Integer> ids = map.getOrDefault(x.getHostName(), new ArrayList<>());
ids.add(x.getId());
map.put(x.getHostName(), ids);
});
map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v));
try {
conn.commit();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
recursion(list.get(list.size() - 1).getId(), conn);
}
private Integer getKey(long time) {
if (time < oneMinute) {
return oneMinute;
} else if (time < tenMinutes) {
return tenMinutes;
} else if (time < oneHour) {
return oneHour;
} else if (time < oneDay) {
return oneDay;
} else {
return Integer.MAX_VALUE;
}
}
}

@ -1,5 +1,6 @@
package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.cron.CronExpression;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
@ -10,6 +11,7 @@ import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
import com.xxl.job.admin.core.thread.JobScheduleHelper;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.admin.dao.*;
import com.xxl.job.admin.service.JobAllocation;
import com.xxl.job.admin.service.XxlJobService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
@ -328,6 +330,7 @@ public class XxlJobServiceImpl implements XxlJobService {
xxlJobInfo.setTriggerNextTime(nextTriggerTime);
xxlJobInfo.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo);
xxlJobInfoDao.update(xxlJobInfo);
return ReturnT.SUCCESS;
}

@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.url=jdbc:mysql://localhost/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.password=12345678
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
### datasource-pool
@ -52,7 +52,7 @@ spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### xxl-job, access token
xxl.job.accessToken=default_token
xxl.job.accessToken=
### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
xxl.job.i18n=zh_CN
@ -63,3 +63,8 @@ xxl.job.triggerpool.slow.max=100
### xxl-job, log retention days
xxl.job.logretentiondays=30
xxl.job.cluster.host.name=
xxl.job.cluster.enable=true

@ -2,7 +2,7 @@
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<contextName>logback</contextName>
<property name="log.path" value="/data/applogs/xxl-job/xxl-job-admin.log"/>
<property name="log.path" value="../data/applogs/xxl-job/xxl-job-admin.log"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xxl.job.admin.dao.XxlJobClusterDao">
<insert id="replace">
replace into xxl_job_cluster values(#{ip}, now());
</insert>
<delete id="delete">
delete from xxl_job_cluster where update_time &lt;= #{time}
</delete>
<select id="findAll" resultType="com.xxl.job.admin.core.model.XxlJobCluster">
select host_name as hostName,update_time as updateTime from xxl_job_cluster
</select>
</mapper>

@ -36,6 +36,8 @@
<result column="trigger_status" property="triggerStatus" />
<result column="trigger_last_time" property="triggerLastTime" />
<result column="trigger_next_time" property="triggerNextTime" />
<result column="host_name" property="hostName" />
<result column="lock_status" property="lockStatus" />
</resultMap>
<sql id="Base_Column_List">
@ -224,17 +226,53 @@
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
<if test="clusterEnable == true">
and t.host_name=#{hostName}
and t.lock_status=0
</if>
ORDER BY id ASC
LIMIT #{pagesize}
</select>
<select id="pageById" resultMap="XxlJobInfo">
select id,trigger_next_time,host_name from xxl_job_info where id>#{id} and trigger_status=1 order by id limit 2000
</select>
<select id="findOldClusterInfo" resultMap="XxlJobInfo">
select a.* from xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name
and b.update_time >=DATE_SUB(now(),INTERVAL 1 MINUTE)
where b.host_name is null and a.trigger_status = 1 limit 1
</select>
<update id="scheduleUpdate" parameterType="com.xxl.job.admin.core.model.XxlJobInfo" >
UPDATE xxl_job_info
SET
trigger_last_time = #{triggerLastTime},
trigger_next_time = #{triggerNextTime},
trigger_status = #{triggerStatus}
trigger_status = #{triggerStatus},
lock_status=#{lockStatus}
WHERE id = #{id}
</update>
<update id="updateStatusById">
update xxl_job_info set lock_status = 1
where id in
<foreach collection="list" item="id" open="(" close=")" separator="," >
#{id}
</foreach>
</update>
<update id="updateHostNameByIds">
update xxl_job_info set host_name = #{hostName}
where id in
<foreach collection="ids" item="id" open="(" close=")" separator="," >
#{id}
</foreach>
</update>
<update id="initLockStatus">
update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE)
set a.lock_status = 0
where (b.host_name is null
<if test="init == true">
or a.host_name =#{hostName}
</if>
) and a.lock_status=1
</update>
</mapper>

@ -6,7 +6,7 @@ import com.xxl.job.core.util.XxlJobRemotingUtil;
/**
* admin api test
*
*
* @author xuxueli 2017-07-28 22:14:52
*/
public class ExecutorBizClient implements ExecutorBiz {

@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory;
import java.util.Date;
/**
/**
* Created by xuxueli on 17/3/1.
*/
public class ExecutorBizImpl implements ExecutorBiz {
@ -46,15 +46,18 @@ public class ExecutorBizImpl implements ExecutorBiz {
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load oldjobHandler + jobThread
//TODO 获取有没有正在执行的线程
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// validjobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
//TODO 最常用的 bean 模式
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
// new jobhandler 通过解析对象时包装的map获取
//TODO 通过一开始解析的xxljob注解 获取对应的对象
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
@ -74,7 +77,9 @@ public class ExecutorBizImpl implements ExecutorBiz {
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
}
//groovy 脚本 忽略
else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
@ -97,7 +102,9 @@ public class ExecutorBizImpl implements ExecutorBiz {
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
}
//其他脚本模式 忽略
else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
@ -120,13 +127,16 @@ public class ExecutorBizImpl implements ExecutorBiz {
// executor block strategy
if (jobThread != null) {
//TODO 阻塞处理策略 注意点 这边阻塞策略是在客户端 实现的 如果路由策略 每次执行的节点不一样 就会导致重复执行
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
//丢弃后续调用
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
//覆盖之前调用
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
@ -134,16 +144,19 @@ public class ExecutorBizImpl implements ExecutorBiz {
jobThread = null;
}
} else {
//单机串行
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
//TODO 开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
//TODO 讲当前任务添加进队列 上一步开启的线程 会扫描这个队列
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}

@ -67,20 +67,22 @@ public class XxlJobExecutor {
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init logpath
//TODO 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
//TODO 初始化服务器地址
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
//TODO init JobLogFileCleanThread 开启一个线程清理日志
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
//TODO init TriggerCallbackThread 调用服务端 /api/callback 反馈任务执行结果
TriggerCallbackThread.getInstance().start();
// init executor-server
//TODO 初始化XXLJOB服务
initEmbedServer(address, ip, port, appname, accessToken);
}
@ -117,6 +119,13 @@ public class XxlJobExecutor {
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
/**
* xxl AdminBizClient adminBizList ,
* @param adminAddresses
* @param accessToken
* @throws Exception
*/
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
@ -147,6 +156,7 @@ public class XxlJobExecutor {
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
//TODO 优先使用address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-addressdefault use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
@ -159,6 +169,7 @@ public class XxlJobExecutor {
// start
embedServer = new EmbedServer();
//TODO
embedServer.start(address, port, appname, accessToken);
}
@ -175,6 +186,7 @@ public class XxlJobExecutor {
// ---------------------- job handler repository ----------------------
//存储业务对象 key为 xxljob.value
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
@ -208,7 +220,7 @@ public class XxlJobExecutor {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/
//TODO 设置访问权限 说明私有的方法也是支持的
executeMethod.setAccessible(true);
// init and destroy
@ -217,6 +229,7 @@ public class XxlJobExecutor {
if (xxlJob.init().trim().length() > 0) {
try {
//获取初始化方法
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
@ -241,6 +254,7 @@ public class XxlJobExecutor {
// ---------------------- job thread repository ----------------------
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
//TODO 处理业务
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

@ -14,6 +14,7 @@ import java.util.Map;
/**
* xxl-job executor (for frameless)
* spring 使
*
* @author xuxueli 2020-11-05
*/

@ -28,6 +28,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
// start
/**
*
*/
@Override
public void afterSingletonsInstantiated() {
@ -35,13 +39,15 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
//初始化xxljob注解对象
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
//初始化 glueFactory 用于执行groovy脚本
GlueFactory.refreshInstance(1);
// super start
try {
//TODO
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
@ -77,6 +83,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
}
}*/
/**
* XxlJob XxlJobExecutor.jobHandlerRepository(map)
* @param applicationContext
*/
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;

@ -30,6 +30,11 @@ public class XxlJobFileAppender {
*/
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
/**
* glue
* @param logPath
*/
public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {

@ -36,11 +36,14 @@ public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//TODO 用于接收服务端请求的线程池
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
@ -59,6 +62,8 @@ public class EmbedServer {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
@ -71,6 +76,7 @@ public class EmbedServer {
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
//TODO 接收请求
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
@ -81,16 +87,18 @@ public class EmbedServer {
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
//TODO 调用 api/registry 注册当前节点
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
// stop
try {
@ -100,7 +108,9 @@ public class EmbedServer {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
@ -122,7 +132,7 @@ public class EmbedServer {
/**
* netty_http
* <p>
*
* Copy from : https://github.com/xuxueli/xxl-rpc
*
* @author xuxueli 2015-11-24 22:25:15
@ -133,7 +143,6 @@ public class EmbedServer {
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
@ -142,6 +151,7 @@ public class EmbedServer {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
@ -150,11 +160,12 @@ public class EmbedServer {
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
// TODO 接收请求以后立刻提交给另一个线程池
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
//TODO
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
@ -167,7 +178,8 @@ public class EmbedServer {
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
//#################### 校验
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
@ -179,25 +191,32 @@ public class EmbedServer {
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
//######################
// services mapping
try {
switch (uri) {
case "/beat":
if ("/beat".equals(uri)) {
//TODO 用于 路由策略 故障转移
return executorBiz.beat();
case "/idleBeat":
} else if ("/idleBeat".equals(uri)) {
//TODO 用于 路由策略 忙碌转移
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
} else if ("/run".equals(uri)) {
//TODO 执行任务
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
} else if ("/kill".equals(uri)) {
//TODO 中止任务
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
} else if ("/log".equals(uri)) {
//TODO 查询日志 注意 实时通过客户端查询日志 如果客户端是在容器中每次部署要注意下日志路径
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
@ -253,4 +272,6 @@ public class EmbedServer {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}
}

@ -34,7 +34,7 @@ public class ExecutorRegistryThread {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
//开启线程往服务器注册 每隔30秒请求一次
registryThread = new Thread(new Runnable() {
@Override
public void run() {
@ -76,7 +76,7 @@ public class ExecutorRegistryThread {
}
}
// registry remove
// TODO registry remove 如果当前服务中止 则会立刻调用 移除接口
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

@ -27,9 +27,14 @@ public class JobLogFileCleanThread {
private Thread localThread;
private volatile boolean toStop = false;
/**
* sleep interrupt
* @param logRetentionDays
*/
public void start(final long logRetentionDays){
// limit min value
// TDOD 注意点 最小值三天一清理
if (logRetentionDays < 3 ) {
return;
}
@ -54,7 +59,7 @@ public class JobLogFileCleanThread {
for (File childFile: childDirs) {
// valid
// valid 生成的日志文件时日期格式的文件夹
if (!childFile.isDirectory()) {
continue;
}
@ -73,7 +78,7 @@ public class JobLogFileCleanThread {
if (logFileCreateDate == null) {
continue;
}
//基于日期校验是不是指定日期之前的文件夹 是则删除
if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
FileUtil.deleteRecursively(childFile);
}

@ -98,8 +98,10 @@ public class JobThread extends Thread{
// init
try {
//TODO 调用 xxljob注解的 init方法
handler.init();
} catch (Throwable e) {
//TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意
logger.error(e.getMessage(), e);
}
@ -111,6 +113,7 @@ public class JobThread extends Thread{
TriggerParam triggerParam = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
//TODO
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
@ -123,15 +126,18 @@ public class JobThread extends Thread{
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
//TODO 当前节点序号
triggerParam.getBroadcastIndex(),
//TODO 节点总数
triggerParam.getBroadcastTotal());
// init job context
// init job context 包装执行的一些参数 通过 threadlocal 存取
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
//TODO 执行超时时间
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
@ -162,6 +168,7 @@ public class JobThread extends Thread{
futureThread.interrupt();
}
} else {
//如果没有执行超时时间的限制 则 直接调用业务方法
// just execute
handler.execute();
}
@ -183,6 +190,8 @@ public class JobThread extends Thread{
);
} else {
//TODO 每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务
//TODO 30这个数字应该可配置 线程存活时间太长了
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
@ -207,6 +216,7 @@ public class JobThread extends Thread{
// callback handler info
if (!toStop) {
// commonm
//TODO 填充callBackQueue 队列 用于讲任务执行结果返回给服务端
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
@ -227,6 +237,7 @@ public class JobThread extends Thread{
}
// callback trigger request in queue
//TODO 待确认
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
/**
* Created by xuxueli on 16/7/22.
*/
public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
@ -32,7 +33,7 @@ public class TriggerCallbackThread {
}
/**
* job results callback queue
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
@ -43,6 +44,10 @@ public class TriggerCallbackThread {
/**
* callback thread
*/
/** TODO codemsg
* xxl/api/callback
* log triggerRetryCallbackThread 线30()
*/
private Thread triggerCallbackThread;
private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false;
@ -63,16 +68,19 @@ public class TriggerCallbackThread {
// normal callback
while(!toStop){
try {
//TODO 从队列中取一个 阻塞方法
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
//TODO 将队列中所有对象全部取出
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
//TODO
doCallback(callbackParamList);
}
}
@ -84,6 +92,7 @@ public class TriggerCallbackThread {
}
// last callback
//TODO 跳出循环 说明xxljob生命周期结束了 防止有数据还停留在队列中
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
@ -104,7 +113,7 @@ public class TriggerCallbackThread {
triggerCallbackThread.start();
// retry
// TODO 扫描上一个线程 请求异常的数据 重新发送
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
@ -163,10 +172,13 @@ public class TriggerCallbackThread {
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// callback, will retry if error
//getAdminBizList 所有xxl服务器地址
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
//TODO 调用服务端 callback接口
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
//日志记录
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
@ -178,6 +190,7 @@ public class TriggerCallbackThread {
}
}
if (!callbackRet) {
//TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 目前不会有这种场景
appendFailCallbackFile(callbackParamList);
}
}
@ -251,6 +264,7 @@ public class TriggerCallbackThread {
List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
//TODO 读取完了 直接删除
callbaclLogFile.delete();
doCallback(callbackParamList);
}

@ -1,5 +1,6 @@
package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
@ -36,6 +37,7 @@ public class SampleXxlJob {
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
logger.info(XxlJobHelper.getJobParam());
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {

@ -1,5 +1,5 @@
# web port
server.port=8081
server.port=8084
# no web
#spring.main.web-environment=false
@ -8,7 +8,7 @@ logging.config=classpath:logback.xml
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.admin.addresses=http://127.0.0.1:8081/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=default_token
@ -16,11 +16,12 @@ xxl.job.accessToken=default_token
### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
### 注册到服务端的地址 与 ip:port2选1 优先address
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logpath=../data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

@ -2,7 +2,7 @@
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<contextName>logback</contextName>
<property name="log.path" value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/>
<property name="log.path" value="../data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>

Loading…
Cancel
Save