From f07fb34d89bf05320d59701dcfae0d493dde9ed9 Mon Sep 17 00:00:00 2001 From: buzhidaolvtu Date: Wed, 19 Apr 2023 23:54:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A5APP=E4=B8=BA=E8=8C=83=E5=9B=B4?= =?UTF-8?q?=E4=BD=BF=E7=94=A8LRU=E5=92=8CLFU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/admin/core/route/ExecutorRouter.java | 15 ++- .../core/route/ExecutorRouterHelper.java | 40 ++++++ .../core/route/strategy/ExecutorRouteLFU.java | 118 +++++++++++------ .../core/route/strategy/ExecutorRouteLRU.java | 122 ++++++++++++------ .../job/admin/core/trigger/XxlJobTrigger.java | 5 +- .../core/route/ExecutorRouterHelperTest.java | 72 +++++++++++ 6 files changed, 287 insertions(+), 85 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouterHelper.java create mode 100644 xxl-job-admin/src/test/java/com/xxl/job/admin/core/route/ExecutorRouterHelperTest.java diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java index 5de9a1d0..8b557c1c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.core.route; +import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import org.slf4j.Logger; @@ -17,8 +18,20 @@ public abstract class ExecutorRouter { * route address * * @param addressList - * @return ReturnT.content=address + * @return ReturnT.content=address */ public abstract ReturnT route(TriggerParam triggerParam, List addressList); + + /** + * route address within group + * @param group + * @param triggerParam + * @param addressList + * @return + */ + public ReturnT route(XxlJobGroup group, TriggerParam triggerParam, List addressList) { + return route(triggerParam, addressList); + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouterHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouterHelper.java new file mode 100644 index 00000000..370add8f --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouterHelper.java @@ -0,0 +1,40 @@ +package com.xxl.job.admin.core.route; + +import com.xxl.job.admin.core.route.strategy.ExecutorRouteLFU; +import com.xxl.job.admin.core.route.strategy.ExecutorRouteLRU; + +public class ExecutorRouterHelper { + + /** + * 以executor为单位,更新LRU和LFU + * + * @param appname + * @param address + */ + public static void updateRouteStats(String appname, String address) { + + updateLRUStats(appname, address); + + updateLFUStats(appname, address); + } + + /** + * 统计LFU + * + * @param appname + * @param address + */ + private static void updateLFUStats(String appname, String address) { + ExecutorRouteLFU.AppAddressPool.getOrCreate(appname).updateAddressStats(address); + } + + /** + * @param appname + * @param address + */ + private static void updateLRUStats(String appname, String address) { + ExecutorRouteLRU.AppAddressPool.getOrCreate(appname).updateAddressStats(address); + } + + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java index 9df19726..841d2ef4 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java @@ -10,70 +10,102 @@ import java.util.concurrent.ConcurrentMap; /** * 单个JOB对应的每个执行器,使用频率最低的优先被选举 - * a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数 - * b、LRU(Least Recently Used):最近最久未使用,时间 - * + * a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数 + * b、LRU(Least Recently Used):最近最久未使用,时间 + *

* Created by xuxueli on 17/3/10. */ public class ExecutorRouteLFU extends ExecutorRouter { - private static ConcurrentMap> jobLfuMap = new ConcurrentHashMap>(); - private static long CACHE_VALID_TIME = 0; + public static class AppAddressPool { + private static ConcurrentMap APP_LFU_MANAGER = new ConcurrentHashMap(); - public String route(int jobId, List addressList) { + /** + * 每个APP name 实例化一个地址池 + * + * @param appname + * @return + */ + public static AppAddressPool getOrCreate(String appname) { + return APP_LFU_MANAGER.computeIfAbsent(appname, key -> new AppAddressPool()); + } - // cache clear - if (System.currentTimeMillis() > CACHE_VALID_TIME) { - jobLfuMap.clear(); - CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; + private List freeAddressList = new ArrayList<>(); + + //key=address + private HashMap usedAddressManager = new HashMap<>(); + + private synchronized void clear(String appname) { + APP_LFU_MANAGER.remove(appname); + getOrCreate(appname); } - // lfu item init - HashMap lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList; - if (lfuItemMap == null) { - lfuItemMap = new HashMap(); - jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖 + public synchronized void updateAddressStats(String address) { + usedAddressManager.put(address, usedAddressManager.getOrDefault(address, 0) + 1); } - // put new - for (String address: addressList) { - if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) { - lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力 + private synchronized String route(List addressList) { + HashMap lfuItemMap = usedAddressManager; + + // remove dead address + List delKeys = new ArrayList<>(); + for (String existKey : lfuItemMap.keySet()) { + if (!addressList.contains(existKey)) { + delKeys.add(existKey); + } } - } - // remove old - List delKeys = new ArrayList<>(); - for (String existKey: lfuItemMap.keySet()) { - if (!addressList.contains(existKey)) { - delKeys.add(existKey); + if (delKeys.size() > 0) { + for (String delKey : delKeys) { + lfuItemMap.remove(delKey); + } } - } - if (delKeys.size() > 0) { - for (String delKey: delKeys) { - lfuItemMap.remove(delKey); + + // put new address into free list + freeAddressList.clear(); + for (String address : addressList) { + if (!lfuItemMap.containsKey(address)) { + freeAddressList.add(address); + } } - } - // load least userd count address - List> lfuItemList = new ArrayList>(lfuItemMap.entrySet()); - Collections.sort(lfuItemList, new Comparator>() { - @Override - public int compare(Map.Entry o1, Map.Entry o2) { - return o1.getValue().compareTo(o2.getValue()); + if (freeAddressList.size() > 0) { + String freeAddress = freeAddressList.get(0); + freeAddressList.remove(0); + return freeAddress; } - }); - Map.Entry addressItem = lfuItemList.get(0); - String minAddress = addressItem.getKey(); - addressItem.setValue(addressItem.getValue() + 1); + // load least userd count address + List> lfuItemList = new ArrayList>(lfuItemMap.entrySet()); + Collections.sort(lfuItemList, new Comparator>() { + @Override + public int compare(Map.Entry o1, Map.Entry o2) { + return o1.getValue().compareTo(o2.getValue()); + } + }); + + Map.Entry addressItem = lfuItemList.get(0); + + return addressItem.getKey(); + } - return addressItem.getKey(); } + public String route(String appname, List addressList) { + AppAddressPool appAddressPool = AppAddressPool.getOrCreate(appname); + // cache clear + if (System.currentTimeMillis() > CACHE_VALID_TIME) { + appAddressPool.clear(appname); + CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24; + } + + return appAddressPool.route(addressList); + } + + private static long CACHE_VALID_TIME = 0; + @Override public ReturnT route(TriggerParam triggerParam, List addressList) { - String address = route(triggerParam.getJobId(), addressList); - return new ReturnT(address); + throw new RuntimeException("remove"); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java index 2d540067..fa9b84af 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.core.route.strategy; +import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; @@ -12,65 +13,106 @@ import java.util.concurrent.ConcurrentMap; /** * 单个JOB对应的每个执行器,最久为使用的优先被选举 - * a、LFU(Least Frequently Used):最不经常使用,频率/次数 - * b(*)、LRU(Least Recently Used):最近最久未使用,时间 - * + * a、LFU(Least Frequently Used):最不经常使用,频率/次数 + * b(*)、LRU(Least Recently Used):最近最久未使用,时间 + *

* Created by xuxueli on 17/3/10. */ public class ExecutorRouteLRU extends ExecutorRouter { - private static ConcurrentMap> jobLRUMap = new ConcurrentHashMap>(); - private static long CACHE_VALID_TIME = 0; + public static class AppAddressPool { + private static ConcurrentMap APP_LRU_MANAGER = new ConcurrentHashMap(); - public String route(int jobId, List addressList) { + /** + * 每个APP name 实例化一个地址池 + * + * @param appname + * @return + */ + public static AppAddressPool getOrCreate(String appname) { + return APP_LRU_MANAGER.computeIfAbsent(appname, key -> new AppAddressPool()); + } - // cache clear - if (System.currentTimeMillis() > CACHE_VALID_TIME) { - jobLRUMap.clear(); - CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; + private List freeAddressList = new ArrayList<>(); + + /** + * LinkedHashMap + * a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期; + * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法; + */ + private LinkedHashMap usedAddressManager = new LinkedHashMap(16, 0.75f, true); + + private synchronized void clear(String appname) { + APP_LRU_MANAGER.remove(appname); + getOrCreate(appname); } - // init lru - LinkedHashMap lruItem = jobLRUMap.get(jobId); - if (lruItem == null) { - /** - * LinkedHashMap - * a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期; - * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法; - */ - lruItem = new LinkedHashMap(16, 0.75f, true); - jobLRUMap.putIfAbsent(jobId, lruItem); + public synchronized void updateAddressStats(String address) { + usedAddressManager.put(address, address); } - // put new - for (String address: addressList) { - if (!lruItem.containsKey(address)) { - lruItem.put(address, address); + private synchronized String route(List addressList) { + LinkedHashMap lruItem = usedAddressManager; + + // remove dead address + List delKeys = new ArrayList<>(); + for (String existKey : lruItem.keySet()) { + if (!addressList.contains(existKey)) { + delKeys.add(existKey); + } } - } - // remove old - List delKeys = new ArrayList<>(); - for (String existKey: lruItem.keySet()) { - if (!addressList.contains(existKey)) { - delKeys.add(existKey); + if (delKeys.size() > 0) { + for (String delKey : delKeys) { + lruItem.remove(delKey); + } } - } - if (delKeys.size() > 0) { - for (String delKey: delKeys) { - lruItem.remove(delKey); + + // put new address into free list + freeAddressList.clear(); + for (String address : addressList) { + if (!lruItem.containsKey(address)) { + freeAddressList.add(address); + } } + + if (freeAddressList.size() > 0) { + String freeAddress = freeAddressList.get(0); + freeAddressList.remove(0); + return freeAddress; + } + + // load + String eldestKey = lruItem.entrySet().iterator().next().getKey(); + String eldestValue = lruItem.get(eldestKey); + return eldestValue; + } - // load - String eldestKey = lruItem.entrySet().iterator().next().getKey(); - String eldestValue = lruItem.get(eldestKey); - return eldestValue; } + public String route(String appname, List addressList) { + AppAddressPool appAddressPool = AppAddressPool.getOrCreate(appname); + // cache clear + if (System.currentTimeMillis() > CACHE_VALID_TIME) { + appAddressPool.clear(appname); + CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24; + } + + return appAddressPool.route(addressList); + } + + private static long CACHE_VALID_TIME = 0; + @Override - public ReturnT route(TriggerParam triggerParam, List addressList) { - String address = route(triggerParam.getJobId(), addressList); + public ReturnT route(XxlJobGroup group, TriggerParam triggerParam, List addressList) { + String address = route(group.getAppname(), addressList); return new ReturnT(address); } + + @Override + public ReturnT route(TriggerParam triggerParam, List addressList) { + throw new RuntimeException("remove"); + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 748befc6..3b73236f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; +import com.xxl.job.admin.core.route.ExecutorRouterHelper; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; @@ -149,9 +150,11 @@ public class XxlJobTrigger { address = group.getRegistryList().get(0); } } else { - routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); + routeAddressResult = executorRouteStrategyEnum.getRouter().route(group, triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); + //以单个executor为范围,更新address的使用统计 + ExecutorRouterHelper.updateRouteStats(group.getAppname(), address); } } } else { diff --git a/xxl-job-admin/src/test/java/com/xxl/job/admin/core/route/ExecutorRouterHelperTest.java b/xxl-job-admin/src/test/java/com/xxl/job/admin/core/route/ExecutorRouterHelperTest.java new file mode 100644 index 00000000..97a97d16 --- /dev/null +++ b/xxl-job-admin/src/test/java/com/xxl/job/admin/core/route/ExecutorRouterHelperTest.java @@ -0,0 +1,72 @@ +package com.xxl.job.admin.core.route; + +import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import org.junit.jupiter.api.Test; + +import java.util.Date; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +class ExecutorRouterHelperTest { + + @Test + public void test_LFU() throws InterruptedException { + XxlJobGroup group = new XxlJobGroup(); + group.setAppname("setAppName"); + group.setTitle("setTitle"); + group.setAddressType(0); + group.setAddressList("191,192,193,194,195,196"); + group.setUpdateTime(new Date()); + + TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobId(1); + + + ConcurrentMap addressStatsMap = new ConcurrentHashMap<>(); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 0, TimeUnit.HOURS, new LinkedBlockingDeque<>()); + for (int i = 0; i < 1000; i++) { + threadPoolExecutor.execute(() -> { + ReturnT route = ExecutorRouteStrategyEnum.LEAST_FREQUENTLY_USED.getRouter().route(group, triggerParam, group.getRegistryList()); + if (route.getCode() == ReturnT.SUCCESS_CODE) { + String address = route.getContent(); + AtomicInteger atomicInteger = addressStatsMap.computeIfAbsent(address, key -> new AtomicInteger(0)); + atomicInteger.incrementAndGet(); + //以单个executor为范围,更新address的使用统计 + ExecutorRouterHelper.updateRouteStats(group.getAppname(), address); + } + }); + } + threadPoolExecutor.shutdown(); + threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS); + System.out.println(addressStatsMap.toString()); + } + + @Test + public void test_LRU() { + XxlJobGroup group = new XxlJobGroup(); + group.setAppname("setAppName"); + group.setTitle("setTitle"); + group.setAddressType(0); + group.setAddressList("191,192,193,194,195,196"); + group.setUpdateTime(new Date()); + + TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobId(1); + + + ConcurrentMap addressStatsMap = new ConcurrentHashMap<>(); + for (int i = 0; i < 100; i++) { + ReturnT route = ExecutorRouteStrategyEnum.LEAST_RECENTLY_USED.getRouter().route(group, triggerParam, group.getRegistryList()); + if (route.getCode() == ReturnT.SUCCESS_CODE) { + String address = route.getContent(); + AtomicInteger atomicInteger = addressStatsMap.computeIfAbsent(address, key -> new AtomicInteger(0)); + atomicInteger.incrementAndGet(); + //以单个executor为范围,更新address的使用统计 + ExecutorRouterHelper.updateRouteStats(group.getAppname(), address); + } + } + System.out.println(addressStatsMap.toString()); + } +} \ No newline at end of file