feat(route) 修复轮询执行器路由策略bug

新增:
1. 修复轮询执行器路由策略新增执行器时,执行器记录器中没有记录新地址时,抛出NPE
pull/69/head
zhudeyu 2 months ago
parent ac9a42f35e
commit 895fd5b38d

@ -91,6 +91,15 @@ public class ExecutorRouteRoundHandler extends ExecutorRouter {
MAX_TIME=1;
return new ReturnT<String>(address);
}else{
// 使用set收集执行器地址, 用于识别是否有执行器新增或删除
Set<String> addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet());
// 检查是否新增执行器
addrSet.forEach(e->{
if(!routeMap.containsKey(e)){
// 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡
routeMap.put(e,MAX_TIME);
}
});
for (String addr : addressList) {
// 获取ip
String key = extractIp(addr);
@ -107,8 +116,7 @@ public class ExecutorRouteRoundHandler extends ExecutorRouter {
String key = extractIp(addr);
routeMap.put(key,MAX_TIME+1);
MAX_TIME=MAX_TIME+1;
// 使用set收集执行器地址, 用于识别是否有执行器新增或删除
Set<String> addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet());
// 检查是否有执行器卸载了执行器
Set<String> keySet = routeMap.keySet();
keySet.forEach(e->{
@ -118,13 +126,7 @@ public class ExecutorRouteRoundHandler extends ExecutorRouter {
routeMap.remove(e);
}
});
// 检查是否新增执行器
addrSet.forEach(e->{
if(!routeMap.containsKey(e)){
// 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡
routeMap.put(e,MAX_TIME);
}
});
return new ReturnT<String>(addr);
}

Loading…
Cancel
Save