feat(route) 新增轮询执行器路由策略

新增:
1. 以执行器为维度,根据执行器被调度的次数,依次调度
pull/69/head
zhudeyu 2 months ago
parent cdb54254d0
commit ac9a42f35e

@ -11,6 +11,7 @@ public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
ROUND_HANDLER(I18nUtil.getString("jobconf_route_round_handler"), new ExecutorRouteRoundHandler()),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),

@ -0,0 +1,134 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* ,,
* : 5,5,
* :
* : 53,2
* : hash, 5
*
* : ,,,
* ,5s,
* @author zhudeyu
* @since 20250715 14:49:40
**/
public class ExecutorRouteRoundHandler extends ExecutorRouter {
/**
* ,
**/
private static ConcurrentMap<String, Integer> routeMap = new ConcurrentHashMap<String, Integer>();
/**
*
**/
private static Integer MAX_TIME = 0;
/**
*
**/
private static long CACHE_VALID_TIME = 0;
/**
* http://127.0.0.1:9999/ 中的ip
* @param url
* @return java.lang.String
* @author zhudeyu
* @since 20250715 14:51:03
**/
public static String extractIp(String url) {
Pattern pattern = Pattern.compile("^http://([^:/]+):(\\d+)/?$");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
/**
* ,
* @param triggerParam
* @param addressList
* @return com.xxl.job.core.biz.model.ReturnT<java.lang.String>
* @author zhudeyu
* @since 20250715 14:51:33
**/
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
// 被调度的执行器地址
String address="";
// 达到清除时间,清除执行器记录器
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeMap.clear();
// 清除最大记录器
MAX_TIME=0;
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
}
if(routeMap.keySet().isEmpty()){
// 首次执行, 初始化,所有执行器 执行次数均为0
addressList.forEach(e->{
String key = extractIp(e);
routeMap.put(key,0);
});
// 调度到第一个执行器地址
address = addressList.get(0);
String key = extractIp(address);
routeMap.put(key,1);
// 最大记录器,加1
MAX_TIME=1;
return new ReturnT<String>(address);
}else{
for (String addr : addressList) {
// 获取ip
String key = extractIp(addr);
// 从执行记录器中
Integer addrTime = routeMap.get(key);
// 该执行器的执行次数 小于 记录器,则取该地址
if(addrTime < MAX_TIME){
routeMap.put(key,addrTime+1);
return new ReturnT<String>(addr);
}
}
// 没有小于记录器的执行了, 则修改记录器的值, 随机取一个
String addr = addressList.get(0);
String key = extractIp(addr);
routeMap.put(key,MAX_TIME+1);
MAX_TIME=MAX_TIME+1;
// 使用set收集执行器地址, 用于识别是否有执行器新增或删除
Set<String> addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet());
// 检查是否有执行器卸载了执行器
Set<String> keySet = routeMap.keySet();
keySet.forEach(e->{
// 执行器不在的 则移除
if(!addrSet.contains(e)){
// log.info("移除执行器{}",e);
routeMap.remove(e);
}
});
// 检查是否新增执行器
addrSet.forEach(e->{
if(!routeMap.containsKey(e)){
// 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡
routeMap.put(e,MAX_TIME);
}
});
return new ReturnT<String>(addr);
}
}
}

@ -227,6 +227,7 @@ jobconf_block_COVER_EARLY=Cover Early
jobconf_route_first=First
jobconf_route_last=Last
jobconf_route_round=Round
jobconf_route_round_handler=RoundHandler
jobconf_route_random=Random
jobconf_route_consistenthash=Consistent Hash
jobconf_route_lfu=Least Frequently Used

@ -227,6 +227,7 @@ jobconf_block_COVER_EARLY=覆盖之前调度
jobconf_route_first=第一个
jobconf_route_last=最后一个
jobconf_route_round=轮询
jobconf_route_round_handler=轮询执行器
jobconf_route_random=随机
jobconf_route_consistenthash=一致性HASH
jobconf_route_lfu=最不经常使用

@ -227,6 +227,7 @@ jobconf_block_COVER_EARLY=覆蓋之前調度
jobconf_route_first=第一個
jobconf_route_last=最後一個
jobconf_route_round=輪詢
jobconf_route_round_handler=輪詢执行器
jobconf_route_random=隨機
jobconf_route_consistenthash=一致性HASH
jobconf_route_lfu=最不經常使用

Loading…
Cancel
Save