Pre Merge pull request !70 from peter/hotfix-3.1.1-release-PR
commit
e4654df493
@ -0,0 +1,136 @@
|
||||
package com.xxl.job.admin.core.route.strategy;
|
||||
|
||||
|
||||
|
||||
import com.xxl.job.admin.core.route.ExecutorRouter;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.biz.model.TriggerParam;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 以执行器为维度,根据执行器被调度的次数,依次调度
|
||||
* 如: 建立5个任务,有5个执行器,
|
||||
* 期望: 每个任务执行器执行一次任务
|
||||
* 实际: 5个任务被调度到了3个执行器上,有2个执行器空闲
|
||||
* 尝试: 路由策略修改为hash一致性, 也不能均匀的分布在5个执行器上
|
||||
*
|
||||
* 已知问题: 调用触发时间相同时,导致获取的执行器地址不准,存在多个任务调用被调度到同一个执行器上的情况,
|
||||
* 自测时,将调度时间相差5s,即可避免此情况
|
||||
* @author zhudeyu
|
||||
* @since 2025年07月15日 14:49:40
|
||||
**/
|
||||
public class ExecutorRouteRoundHandler extends ExecutorRouter {
|
||||
/**
|
||||
* 执行器记录器, 记录每个执行器被调度的次数
|
||||
**/
|
||||
private static ConcurrentMap<String, Integer> routeMap = new ConcurrentHashMap<String, Integer>();
|
||||
/**
|
||||
* 最大记录器
|
||||
**/
|
||||
private static Integer MAX_TIME = 0;
|
||||
/**
|
||||
* 清除执行器记录器实际
|
||||
**/
|
||||
private static long CACHE_VALID_TIME = 0;
|
||||
|
||||
/**
|
||||
* 识别 http://127.0.0.1:9999/ 中的ip
|
||||
* @param url 执行器地址
|
||||
* @return java.lang.String
|
||||
* @author zhudeyu
|
||||
* @since 2025年07月15日 14:51:03
|
||||
**/
|
||||
public static String extractIp(String url) {
|
||||
Pattern pattern = Pattern.compile("^http://([^:/]+):(\\d+)/?$");
|
||||
Matcher matcher = pattern.matcher(url);
|
||||
if (matcher.find()) {
|
||||
return matcher.group(1);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据每个任务执行器被调度的次数,依次调度
|
||||
* @param triggerParam 任务参数
|
||||
* @param addressList 执行器地址
|
||||
* @return com.xxl.job.core.biz.model.ReturnT<java.lang.String>
|
||||
* @author zhudeyu
|
||||
* @since 2025年07月15日 14:51:33
|
||||
**/
|
||||
@Override
|
||||
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
|
||||
// 被调度的执行器地址
|
||||
String address="";
|
||||
// 达到清除时间,清除执行器记录器
|
||||
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
|
||||
routeMap.clear();
|
||||
// 清除最大记录器
|
||||
MAX_TIME=0;
|
||||
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
|
||||
}
|
||||
|
||||
if(routeMap.keySet().isEmpty()){
|
||||
// 首次执行, 初始化,所有执行器 执行次数均为0
|
||||
addressList.forEach(e->{
|
||||
String key = extractIp(e);
|
||||
routeMap.put(key,0);
|
||||
});
|
||||
|
||||
// 调度到第一个执行器地址
|
||||
address = addressList.get(0);
|
||||
String key = extractIp(address);
|
||||
routeMap.put(key,1);
|
||||
// 最大记录器,加1
|
||||
MAX_TIME=1;
|
||||
return new ReturnT<String>(address);
|
||||
}else{
|
||||
// 使用set收集执行器地址, 用于识别是否有执行器新增或删除
|
||||
Set<String> addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet());
|
||||
// 检查是否新增执行器
|
||||
addrSet.forEach(e->{
|
||||
if(!routeMap.containsKey(e)){
|
||||
// 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡
|
||||
routeMap.put(e,MAX_TIME);
|
||||
}
|
||||
});
|
||||
for (String addr : addressList) {
|
||||
// 获取ip
|
||||
String key = extractIp(addr);
|
||||
// 从执行记录器中
|
||||
Integer addrTime = routeMap.get(key);
|
||||
// 该执行器的执行次数 小于 记录器,则取该地址
|
||||
if(addrTime < MAX_TIME){
|
||||
routeMap.put(key,addrTime+1);
|
||||
return new ReturnT<String>(addr);
|
||||
}
|
||||
}
|
||||
// 没有小于记录器的执行了, 则修改记录器的值, 随机取一个
|
||||
String addr = addressList.get(0);
|
||||
String key = extractIp(addr);
|
||||
routeMap.put(key,MAX_TIME+1);
|
||||
MAX_TIME=MAX_TIME+1;
|
||||
|
||||
// 检查是否有执行器卸载了执行器
|
||||
Set<String> keySet = routeMap.keySet();
|
||||
keySet.forEach(e->{
|
||||
// 执行器不在的 则移除
|
||||
if(!addrSet.contains(e)){
|
||||
// log.info("移除执行器{}",e);
|
||||
routeMap.remove(e);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
return new ReturnT<String>(addr);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in new issue