From ac9a42f35ef5c7b80af0dffb6687af3097db59f3 Mon Sep 17 00:00:00 2001 From: zhudeyu Date: Tue, 15 Jul 2025 15:25:17 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(route)=20=E6=96=B0=E5=A2=9E=E8=BD=AE?= =?UTF-8?q?=E8=AF=A2=E6=89=A7=E8=A1=8C=E5=99=A8=E8=B7=AF=E7=94=B1=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增: 1. 以执行器为维度,根据执行器被调度的次数,依次调度 --- .../core/route/ExecutorRouteStrategyEnum.java | 1 + .../strategy/ExecutorRouteRoundHandler.java | 134 ++++++++++++++++++ .../main/resources/i18n/message_en.properties | 1 + .../resources/i18n/message_zh_CN.properties | 1 + .../resources/i18n/message_zh_TC.properties | 1 + 5 files changed, 138 insertions(+) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java index 7fff93a9..2cea54b9 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java @@ -11,6 +11,7 @@ public enum ExecutorRouteStrategyEnum { FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()), LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()), ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()), + ROUND_HANDLER(I18nUtil.getString("jobconf_route_round_handler"), new ExecutorRouteRoundHandler()), RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()), CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()), LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()), diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java new file mode 100644 index 00000000..d2dcadc2 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java @@ -0,0 +1,134 @@ +package com.xxl.job.admin.core.route.strategy; + + + +import com.xxl.job.admin.core.route.ExecutorRouter; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * 以执行器为维度,根据执行器被调度的次数,依次调度 + * 如: 建立5个任务,有5个执行器, + * 期望: 每个任务执行器执行一次任务 + * 实际: 5个任务被调度到了3个执行器上,有2个执行器空闲 + * 尝试: 路由策略修改为hash一致性, 也不能均匀的分布在5个执行器上 + * + * 已知问题: 调用触发时间相同时,导致获取的执行器地址不准,存在多个任务调用被调度到同一个执行器上的情况, + * 自测时,将调度时间相差5s,即可避免此情况 + * @author zhudeyu + * @since 2025年07月15日 14:49:40 + **/ +public class ExecutorRouteRoundHandler extends ExecutorRouter { + /** + * 执行器记录器, 记录每个执行器被调度的次数 + **/ + private static ConcurrentMap routeMap = new ConcurrentHashMap(); + /** + * 最大记录器 + **/ + private static Integer MAX_TIME = 0; + /** + * 清除执行器记录器实际 + **/ + private static long CACHE_VALID_TIME = 0; + + /** + * 识别 http://127.0.0.1:9999/ 中的ip + * @param url 执行器地址 + * @return java.lang.String + * @author zhudeyu + * @since 2025年07月15日 14:51:03 + **/ + public static String extractIp(String url) { + Pattern pattern = Pattern.compile("^http://([^:/]+):(\\d+)/?$"); + Matcher matcher = pattern.matcher(url); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } + + /** + * 根据每个任务执行器被调度的次数,依次调度 + * @param triggerParam 任务参数 + * @param addressList 执行器地址 + * @return com.xxl.job.core.biz.model.ReturnT + * @author zhudeyu + * @since 2025年07月15日 14:51:33 + **/ + @Override + public ReturnT route(TriggerParam triggerParam, List addressList) { + // 被调度的执行器地址 + String address=""; + // 达到清除时间,清除执行器记录器 + if (System.currentTimeMillis() > CACHE_VALID_TIME) { + routeMap.clear(); + // 清除最大记录器 + MAX_TIME=0; + CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24; + } + + if(routeMap.keySet().isEmpty()){ + // 首次执行, 初始化,所有执行器 执行次数均为0 + addressList.forEach(e->{ + String key = extractIp(e); + routeMap.put(key,0); + }); + + // 调度到第一个执行器地址 + address = addressList.get(0); + String key = extractIp(address); + routeMap.put(key,1); + // 最大记录器,加1 + MAX_TIME=1; + return new ReturnT(address); + }else{ + for (String addr : addressList) { + // 获取ip + String key = extractIp(addr); + // 从执行记录器中 + Integer addrTime = routeMap.get(key); + // 该执行器的执行次数 小于 记录器,则取该地址 + if(addrTime < MAX_TIME){ + routeMap.put(key,addrTime+1); + return new ReturnT(addr); + } + } + // 没有小于记录器的执行了, 则修改记录器的值, 随机取一个 + String addr = addressList.get(0); + String key = extractIp(addr); + routeMap.put(key,MAX_TIME+1); + MAX_TIME=MAX_TIME+1; + // 使用set收集执行器地址, 用于识别是否有执行器新增或删除 + Set addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet()); + // 检查是否有执行器卸载了执行器 + Set keySet = routeMap.keySet(); + keySet.forEach(e->{ + // 执行器不在的 则移除 + if(!addrSet.contains(e)){ + // log.info("移除执行器{}",e); + routeMap.remove(e); + } + }); + // 检查是否新增执行器 + addrSet.forEach(e->{ + if(!routeMap.containsKey(e)){ + // 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡 + routeMap.put(e,MAX_TIME); + } + }); + + return new ReturnT(addr); + } + + } + +} diff --git a/xxl-job-admin/src/main/resources/i18n/message_en.properties b/xxl-job-admin/src/main/resources/i18n/message_en.properties index a6e4d1e1..b2afd9db 100644 --- a/xxl-job-admin/src/main/resources/i18n/message_en.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_en.properties @@ -227,6 +227,7 @@ jobconf_block_COVER_EARLY=Cover Early jobconf_route_first=First jobconf_route_last=Last jobconf_route_round=Round +jobconf_route_round_handler=RoundHandler jobconf_route_random=Random jobconf_route_consistenthash=Consistent Hash jobconf_route_lfu=Least Frequently Used diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties index 4ae39f67..b092ab61 100644 --- a/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_zh_CN.properties @@ -227,6 +227,7 @@ jobconf_block_COVER_EARLY=覆盖之前调度 jobconf_route_first=第一个 jobconf_route_last=最后一个 jobconf_route_round=轮询 +jobconf_route_round_handler=轮询执行器 jobconf_route_random=随机 jobconf_route_consistenthash=一致性HASH jobconf_route_lfu=最不经常使用 diff --git a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties index 22e14435..dd29d779 100755 --- a/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties +++ b/xxl-job-admin/src/main/resources/i18n/message_zh_TC.properties @@ -227,6 +227,7 @@ jobconf_block_COVER_EARLY=覆蓋之前調度 jobconf_route_first=第一個 jobconf_route_last=最後一個 jobconf_route_round=輪詢 +jobconf_route_round_handler=輪詢执行器 jobconf_route_random=隨機 jobconf_route_consistenthash=一致性HASH jobconf_route_lfu=最不經常使用 From 895fd5b38dd4da2d0df61ab4d56f75c16dc5b4d4 Mon Sep 17 00:00:00 2001 From: zhudeyu Date: Fri, 18 Jul 2025 14:08:48 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(route)=20=E4=BF=AE=E5=A4=8D=E8=BD=AE?= =?UTF-8?q?=E8=AF=A2=E6=89=A7=E8=A1=8C=E5=99=A8=E8=B7=AF=E7=94=B1=E7=AD=96?= =?UTF-8?q?=E7=95=A5bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增: 1. 修复轮询执行器路由策略新增执行器时,执行器记录器中没有记录新地址时,抛出NPE --- .../strategy/ExecutorRouteRoundHandler.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java index d2dcadc2..00611b88 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRoundHandler.java @@ -91,6 +91,15 @@ public class ExecutorRouteRoundHandler extends ExecutorRouter { MAX_TIME=1; return new ReturnT(address); }else{ + // 使用set收集执行器地址, 用于识别是否有执行器新增或删除 + Set addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet()); + // 检查是否新增执行器 + addrSet.forEach(e->{ + if(!routeMap.containsKey(e)){ + // 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡 + routeMap.put(e,MAX_TIME); + } + }); for (String addr : addressList) { // 获取ip String key = extractIp(addr); @@ -107,8 +116,7 @@ public class ExecutorRouteRoundHandler extends ExecutorRouter { String key = extractIp(addr); routeMap.put(key,MAX_TIME+1); MAX_TIME=MAX_TIME+1; - // 使用set收集执行器地址, 用于识别是否有执行器新增或删除 - Set addrSet = addressList.stream().map(e -> extractIp(e)).collect(Collectors.toSet()); + // 检查是否有执行器卸载了执行器 Set keySet = routeMap.keySet(); keySet.forEach(e->{ @@ -118,13 +126,7 @@ public class ExecutorRouteRoundHandler extends ExecutorRouter { routeMap.remove(e); } }); - // 检查是否新增执行器 - addrSet.forEach(e->{ - if(!routeMap.containsKey(e)){ - // 新增的执行器默认给最大值,避免 将所有任务全部调度到该执行器上影响平衡 - routeMap.put(e,MAX_TIME); - } - }); + return new ReturnT(addr); }