parent
adcde32ba1
commit
68efe5fafa
@ -0,0 +1,26 @@
|
||||
package com.xxl.job.admin.core.model;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
public class XxlJobCluster {
|
||||
|
||||
private String hostName;
|
||||
|
||||
private Date updateTime;
|
||||
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
public void setHostName(String hostName) {
|
||||
this.hostName = hostName;
|
||||
}
|
||||
|
||||
public Date getUpdateTime() {
|
||||
return updateTime;
|
||||
}
|
||||
|
||||
public void setUpdateTime(Date updateTime) {
|
||||
this.updateTime = updateTime;
|
||||
}
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package com.xxl.job.admin.dao;
|
||||
|
||||
import com.xxl.job.admin.core.model.XxlJobCluster;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface XxlJobClusterDao {
|
||||
void replace(@Param("ip") String ip);
|
||||
|
||||
List<XxlJobCluster> findAll();
|
||||
|
||||
void delete(@Param("time") Date date);
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.xxl.job.admin.service;
|
||||
|
||||
import com.xxl.job.admin.core.model.XxlJobInfo;
|
||||
|
||||
public interface JobAllocation {
|
||||
|
||||
/**
|
||||
* 计算单个任务的分配
|
||||
* @param jobInfo
|
||||
*/
|
||||
default void allocation(XxlJobInfo jobInfo){}
|
||||
|
||||
/**
|
||||
* 用于节点起来的时候 重新计算任务的分配
|
||||
*/
|
||||
default void flush(){}
|
||||
|
||||
/**
|
||||
* 启动初始化相关动作
|
||||
*/
|
||||
default void init(boolean init){}
|
||||
}
|
@ -0,0 +1,149 @@
|
||||
package com.xxl.job.admin.service.impl;
|
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
|
||||
import com.xxl.job.admin.core.model.XxlJobCluster;
|
||||
import com.xxl.job.admin.core.model.XxlJobInfo;
|
||||
import com.xxl.job.admin.service.JobAllocation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author liyong
|
||||
*/
|
||||
public class AverageJobAllocation implements JobAllocation {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class);
|
||||
|
||||
|
||||
private static int oneMinute = 60;
|
||||
private static int tenMinutes = oneMinute * 10;
|
||||
private static int oneHour = tenMinutes * 6;
|
||||
private static int oneDay = oneHour * 24;
|
||||
|
||||
private Map<Integer, AtomicInteger> averageMap = new ConcurrentHashMap<>();
|
||||
private Map<String, List<String>> ipMap = new ConcurrentHashMap<>();
|
||||
|
||||
{
|
||||
// 防止执行频率高的 扎堆在一起
|
||||
averageMap.put(oneMinute, new AtomicInteger(0));
|
||||
averageMap.put(tenMinutes, new AtomicInteger(0));
|
||||
averageMap.put(oneHour, new AtomicInteger(0));
|
||||
averageMap.put(oneDay, new AtomicInteger(0));
|
||||
averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void allocation(XxlJobInfo jobInfo) {
|
||||
List<XxlJobCluster> all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll();
|
||||
List<String> ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000)
|
||||
.map(XxlJobCluster::getHostName).collect(Collectors.toList());
|
||||
|
||||
if (ipList.isEmpty()) {
|
||||
jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName());
|
||||
return;
|
||||
}
|
||||
|
||||
Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis());
|
||||
int i = averageMap.get(key).incrementAndGet();
|
||||
|
||||
jobInfo.setHostName(ipList.get(i % ipList.size()));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(boolean init) {
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
Connection conn = null;
|
||||
Boolean connAutoCommit = null;
|
||||
PreparedStatement preparedStatement = null;
|
||||
|
||||
try {
|
||||
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
|
||||
connAutoCommit = conn.getAutoCommit();
|
||||
conn.setAutoCommit(false);
|
||||
|
||||
preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update");
|
||||
preparedStatement.execute();
|
||||
|
||||
recursion(0, conn);
|
||||
} catch (Exception e) {
|
||||
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
|
||||
} finally {
|
||||
// commit
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
try {
|
||||
conn.setAutoCommit(connAutoCommit);
|
||||
} catch (SQLException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
try {
|
||||
conn.close();
|
||||
} catch (SQLException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// close PreparedStatement
|
||||
if (null != preparedStatement) {
|
||||
try {
|
||||
preparedStatement.close();
|
||||
} catch (SQLException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void recursion(int id, Connection conn) {
|
||||
List<XxlJobInfo> list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id);
|
||||
if (CollectionUtils.isEmpty(list))
|
||||
return;
|
||||
Map<String, List<Integer>> map = new HashMap<>();
|
||||
list.forEach(x -> {
|
||||
allocation(x);
|
||||
List<Integer> ids = map.getOrDefault(x.getHostName(), new ArrayList<>());
|
||||
ids.add(x.getId());
|
||||
map.put(x.getHostName(), ids);
|
||||
});
|
||||
map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v));
|
||||
try {
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
recursion(list.get(list.size() - 1).getId(), conn);
|
||||
}
|
||||
|
||||
private Integer getKey(long time) {
|
||||
if (time < oneMinute) {
|
||||
return oneMinute;
|
||||
} else if (time < tenMinutes) {
|
||||
return tenMinutes;
|
||||
} else if (time < oneHour) {
|
||||
return oneHour;
|
||||
} else if (time < oneDay) {
|
||||
return oneDay;
|
||||
} else {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.xxl.job.admin.dao.XxlJobClusterDao">
|
||||
|
||||
<insert id="replace">
|
||||
replace into xxl_job_cluster values(#{ip}, now());
|
||||
</insert>
|
||||
<delete id="delete">
|
||||
delete from xxl_job_cluster where update_time <= #{time}
|
||||
</delete>
|
||||
<select id="findAll" resultType="com.xxl.job.admin.core.model.XxlJobCluster">
|
||||
select host_name as hostName,update_time as updateTime from xxl_job_cluster
|
||||
</select>
|
||||
|
||||
|
||||
</mapper>
|
Loading…
Reference in new issue