新增多节点任务均摊功能

pull/56/head^2
yong.li07 2 years ago
parent 6effc8b98f
commit 584e2f6edc

@ -32,7 +32,11 @@ CREATE TABLE `xxl_job_info` (
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态0-停止1-运行',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
PRIMARY KEY (`id`)
`host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名',
`lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态1-改任务锁定中 0-未锁定',
PRIMARY KEY (`id`),
KEY `idx_host_name` (`host_name`),
KEY `idx_lock` (`lock_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log` (
@ -113,10 +117,18 @@ CREATE TABLE `xxl_job_lock` (
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_cluster`(
`host_name` varchar(100) NOT NULL COMMENT '实例名',
`update_time` datetime DEFAULT NULL comment '更新时间',
PRIMARY KEY (`host_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock');
commit;

@ -1,8 +1,13 @@
package com.xxl.job.admin.core.conf;
import com.xxl.job.admin.core.alarm.JobAlarmer;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.dao.*;
import com.xxl.job.admin.service.JobAllocation;
import com.xxl.job.admin.service.impl.AverageJobAllocation;
import com.xxl.job.core.util.IpUtil;
import io.micrometer.core.instrument.util.StringUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
@ -36,8 +41,11 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
public void afterPropertiesSet() throws Exception {
adminConfig = this;
getJobAllocation().init(true);
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
getJobAllocation().flush();
}
@Override
@ -67,6 +75,16 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Value("${xxl.job.logretentiondays}")
private int logretentiondays;
@Value("${xxl.job.cluster.host.name}")
private String hostName;
@Value("${server.port}")
private int port;
@Value("${xxl.job.cluster.enable:false}")
private boolean clusterEnable;
// dao, service
@Resource
@ -79,6 +97,9 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private XxlJobGroupDao xxlJobGroupDao;
@Resource
private XxlJobLogReportDao xxlJobLogReportDao;
@Resource
private XxlJobClusterDao xxlJobClusterDao;
@Resource
private JavaMailSender mailSender;
@Resource
@ -86,6 +107,23 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Resource
private JobAlarmer jobAlarmer;
private JobAllocation jobAllocation = defaultJobAllocation;
public String getHostName() {
return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName;
}
private static JobAllocation defaultJobAllocation = new JobAllocation() {
@Override
public void allocation(XxlJobInfo jobInfo) {
}
@Override
public void init(boolean init) {
}
};
public String getI18n() {
if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) {
@ -155,4 +193,20 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
return jobAlarmer;
}
public XxlJobClusterDao getXxlJobClusterDao() {
return xxlJobClusterDao;
}
public JobAllocation getJobAllocation() {
return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation;
}
public void setJobAllocation(JobAllocation jobAllocation) {
this.jobAllocation = jobAllocation;
}
public boolean isClusterEnable() {
return clusterEnable;
}
}

@ -0,0 +1,26 @@
package com.xxl.job.admin.core.model;
import java.util.Date;
public class XxlJobCluster {
private String hostName;
private Date updateTime;
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}

@ -42,6 +42,10 @@ public class XxlJobInfo {
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
private String hostName; // 运行实例名称
private int lockStatus;
public int getId() {
return id;
@ -234,4 +238,20 @@ public class XxlJobInfo {
public void setTriggerNextTime(long triggerNextTime) {
this.triggerNextTime = triggerNextTime;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public int getLockStatus() {
return lockStatus;
}
public void setLockStatus(int lockStatus) {
this.lockStatus = lockStatus;
}
}

@ -2,10 +2,12 @@ package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@ -29,6 +31,13 @@ public class JobRegistryHelper {
private Thread registryMonitorThread;
private volatile boolean toStop = false;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "cluster host registry");
}
});
public void start(){
// for registry or remove
@ -126,6 +135,28 @@ public class JobRegistryHelper {
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName());
Date date = new Date();
XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5));
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo();
if(jobInfo!=null){
XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false);
XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush();
}
} catch (Exception e) {
logger.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 0, 1000 * 30, TimeUnit.MILLISECONDS);
}
public void toStop(){

@ -15,6 +15,7 @@ import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author xuxueli 2019-05-21
@ -77,8 +78,25 @@ public class JobScheduleHelper {
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount
, XxlJobAdminConfig.getAdminConfig().getHostName(),
XxlJobAdminConfig.getAdminConfig().isClusterEnable());
if (scheduleList!=null && !scheduleList.isEmpty()) {
if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream()
.map(XxlJobInfo::getId).collect(Collectors.toList()));
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
@ -140,6 +158,7 @@ public class JobScheduleHelper {
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
jobInfo.setLockStatus(0);
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}

@ -0,0 +1,20 @@
package com.xxl.job.admin.dao;
import com.xxl.job.admin.core.model.XxlJobCluster;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
/**
* @author liyong
*/
@Mapper
public interface XxlJobClusterDao {
void replace(@Param("ip") String ip);
List<XxlJobCluster> findAll();
void delete(@Param("time") Date date);
}

@ -41,9 +41,20 @@ public interface XxlJobInfoDao {
public int findAllCount();
public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize );
public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize,
@Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable);
public int scheduleUpdate(XxlJobInfo xxlJobInfo);
void updateStatusById(@Param("list") List<Integer> collect);
List<XxlJobInfo> pageById(@Param("id") Integer id);
void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List<Integer> v);
void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init);
XxlJobInfo findOldClusterInfo();
}

@ -0,0 +1,25 @@
package com.xxl.job.admin.service;
import com.xxl.job.admin.core.model.XxlJobInfo;
/**
* @author liyong
*/
public interface JobAllocation {
/**
*
* @param jobInfo
*/
default void allocation(XxlJobInfo jobInfo){}
/**
*
*/
default void flush(){}
/**
*
*/
default void init(boolean init){}
}

@ -0,0 +1,150 @@
package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobCluster;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.service.JobAllocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* @author liyong
*/
public class AverageJobAllocation implements JobAllocation {
private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class);
private static int oneMinute = 60;
private static int tenMinutes = oneMinute * 10;
private static int oneHour = tenMinutes * 6;
private static int oneDay = oneHour * 24;
private Map<Integer, AtomicInteger> averageMap = new ConcurrentHashMap<>();
private Map<String, List<String>> ipMap = new ConcurrentHashMap<>();
{
// 防止执行频率高的 扎堆在一起
averageMap.put(oneMinute, new AtomicInteger(0));
averageMap.put(tenMinutes, new AtomicInteger(0));
averageMap.put(oneHour, new AtomicInteger(0));
averageMap.put(oneDay, new AtomicInteger(0));
averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0));
}
@Override
public void allocation(XxlJobInfo jobInfo) {
List<XxlJobCluster> all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll();
List<String> ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000)
.map(XxlJobCluster::getHostName).collect(Collectors.toList());
if (ipList.isEmpty()) {
jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName());
return;
}
Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis());
int i = averageMap.get(key).incrementAndGet();
jobInfo.setHostName(ipList.get(i % ipList.size()));
}
@Override
public void init(boolean init) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init);
}
@Override
public void flush() {
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update");
preparedStatement.execute();
recursion(0, conn);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job, cluster flush error:{}", e);
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
try {
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
}
private void recursion(int id, Connection conn) {
List<XxlJobInfo> list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id);
if (CollectionUtils.isEmpty(list))
return;
Map<String, List<Integer>> map = new HashMap<>();
list.forEach(x -> {
allocation(x);
List<Integer> ids = map.getOrDefault(x.getHostName(), new ArrayList<>());
ids.add(x.getId());
map.put(x.getHostName(), ids);
});
map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v));
try {
conn.commit();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
recursion(list.get(list.size() - 1).getId(), conn);
}
private Integer getKey(long time) {
if (time < oneMinute) {
return oneMinute;
} else if (time < tenMinutes) {
return tenMinutes;
} else if (time < oneHour) {
return oneHour;
} else if (time < oneDay) {
return oneDay;
} else {
return Integer.MAX_VALUE;
}
}
}

@ -1,5 +1,6 @@
package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.cron.CronExpression;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
@ -328,6 +329,7 @@ public class XxlJobServiceImpl implements XxlJobService {
xxlJobInfo.setTriggerNextTime(nextTriggerTime);
xxlJobInfo.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo);
xxlJobInfoDao.update(xxlJobInfo);
return ReturnT.SUCCESS;
}

@ -63,3 +63,7 @@ xxl.job.triggerpool.slow.max=100
### xxl-job, log retention days
xxl.job.logretentiondays=30
### xxl-job cluster
xxl.job.cluster.host.name=
xxl.job.cluster.enable=true

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xxl.job.admin.dao.XxlJobClusterDao">
<insert id="replace">
replace into xxl_job_cluster values(#{ip}, now());
</insert>
<delete id="delete">
delete from xxl_job_cluster where update_time &lt;= #{time}
</delete>
<select id="findAll" resultType="com.xxl.job.admin.core.model.XxlJobCluster">
select host_name as hostName,update_time as updateTime from xxl_job_cluster
</select>
</mapper>

@ -36,6 +36,9 @@
<result column="trigger_status" property="triggerStatus" />
<result column="trigger_last_time" property="triggerLastTime" />
<result column="trigger_next_time" property="triggerNextTime" />
<result column="host_name" property="hostName" />
<result column="lock_status" property="lockStatus" />
</resultMap>
<sql id="Base_Column_List">
@ -224,6 +227,10 @@
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
<if test="clusterEnable == true">
and t.host_name=#{hostName}
and t.lock_status=0
</if>
ORDER BY id ASC
LIMIT #{pagesize}
</select>
@ -233,8 +240,40 @@
SET
trigger_last_time = #{triggerLastTime},
trigger_next_time = #{triggerNextTime},
trigger_status = #{triggerStatus}
trigger_status = #{triggerStatus},
lock_status=#{lockStatus}
WHERE id = #{id}
</update>
<select id="pageById" resultMap="XxlJobInfo">
select id,trigger_next_time,host_name from xxl_job_info where id>#{id} and trigger_status=1 order by id limit 2000
</select>
<select id="findOldClusterInfo" resultMap="XxlJobInfo">
select a.* from xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name
and b.update_time >=DATE_SUB(now(),INTERVAL 1 MINUTE)
where b.host_name is null and a.trigger_status = 1 limit 1
</select>
<update id="updateStatusById">
update xxl_job_info set lock_status = 1
where id in
<foreach collection="list" item="id" open="(" close=")" separator="," >
#{id}
</foreach>
</update>
<update id="updateHostNameByIds">
update xxl_job_info set host_name = #{hostName}
where id in
<foreach collection="ids" item="id" open="(" close=")" separator="," >
#{id}
</foreach>
</update>
<update id="initLockStatus">
update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE)
set a.lock_status = 0
where (b.host_name is null
<if test="init == true">
or a.host_name =#{hostName}
</if>
) and a.lock_status=1
</update>
</mapper>

@ -36,6 +36,7 @@ public class SampleXxlJob {
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
logger.info(XxlJobHelper.getJobParam());
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {

Loading…
Cancel
Save