Pre Merge pull request !49 from buzhidaolvtu/master

pull/49/MERGE
buzhidaolvtu 2 years ago committed by Gitee
commit 398dd64d6e
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

@ -1,5 +1,6 @@
package com.xxl.job.admin.core.route;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import org.slf4j.Logger;
@ -17,8 +18,20 @@ public abstract class ExecutorRouter {
* route address
*
* @param addressList
* @return ReturnT.content=address
* @return ReturnT.content=address
*/
public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
/**
* route address within group
* @param group
* @param triggerParam
* @param addressList
* @return
*/
public ReturnT<String> route(XxlJobGroup group, TriggerParam triggerParam, List<String> addressList) {
return route(triggerParam, addressList);
}
}

@ -0,0 +1,40 @@
package com.xxl.job.admin.core.route;
import com.xxl.job.admin.core.route.strategy.ExecutorRouteLFU;
import com.xxl.job.admin.core.route.strategy.ExecutorRouteLRU;
public class ExecutorRouterHelper {
/**
* executorLRULFU
*
* @param appname
* @param address
*/
public static void updateRouteStats(String appname, String address) {
updateLRUStats(appname, address);
updateLFUStats(appname, address);
}
/**
* LFU
*
* @param appname
* @param address
*/
private static void updateLFUStats(String appname, String address) {
ExecutorRouteLFU.AppAddressPool.getOrCreate(appname).updateAddressStats(address);
}
/**
* @param appname
* @param address
*/
private static void updateLRUStats(String appname, String address) {
ExecutorRouteLRU.AppAddressPool.getOrCreate(appname).updateAddressStats(address);
}
}

@ -10,70 +10,102 @@ import java.util.concurrent.ConcurrentMap;
/**
* JOB使
* a(*)LFU(Least Frequently Used)使/
* bLRU(Least Recently Used)使
*
* a(*)LFU(Least Frequently Used)使/
* bLRU(Least Recently Used)使
* <p>
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLFU extends ExecutorRouter {
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
private static long CACHE_VALID_TIME = 0;
public static class AppAddressPool {
private static ConcurrentMap<String, AppAddressPool> APP_LFU_MANAGER = new ConcurrentHashMap();
public String route(int jobId, List<String> addressList) {
/**
* APP name
*
* @param appname
* @return
*/
public static AppAddressPool getOrCreate(String appname) {
return APP_LFU_MANAGER.computeIfAbsent(appname, key -> new AppAddressPool());
}
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
private List<String> freeAddressList = new ArrayList<>();
//key=address
private HashMap<String, Integer> usedAddressManager = new HashMap<>();
private synchronized void clear(String appname) {
APP_LFU_MANAGER.remove(appname);
getOrCreate(appname);
}
// lfu item init
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参CompareValue排序暂时只能通过ArrayList
if (lfuItemMap == null) {
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖
public synchronized void updateAddressStats(String address) {
usedAddressManager.put(address, usedAddressManager.getOrDefault(address, 0) + 1);
}
// put new
for (String address: addressList) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次缓解首次压力
private synchronized String route(List<String> addressList) {
HashMap<String, Integer> lfuItemMap = usedAddressManager;
// remove dead address
List<String> delKeys = new ArrayList<>();
for (String existKey : lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
if (delKeys.size() > 0) {
for (String delKey : delKeys) {
lfuItemMap.remove(delKey);
}
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
// put new address into free list
freeAddressList.clear();
for (String address : addressList) {
if (!lfuItemMap.containsKey(address)) {
freeAddressList.add(address);
}
}
}
// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
if (freeAddressList.size() > 0) {
String freeAddress = freeAddressList.get(0);
freeAddressList.remove(0);
return freeAddress;
}
});
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
return addressItem.getKey();
}
return addressItem.getKey();
}
public String route(String appname, List<String> addressList) {
AppAddressPool appAddressPool = AppAddressPool.getOrCreate(appname);
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
appAddressPool.clear(appname);
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
}
return appAddressPool.route(addressList);
}
private static long CACHE_VALID_TIME = 0;
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
throw new RuntimeException("remove");
}
}

@ -1,5 +1,6 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
@ -12,65 +13,106 @@ import java.util.concurrent.ConcurrentMap;
/**
* JOB使
* aLFU(Least Frequently Used)使/
* b(*)LRU(Least Recently Used)使
*
* aLFU(Least Frequently Used)使/
* b(*)LRU(Least Recently Used)使
* <p>
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLRU extends ExecutorRouter {
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
private static long CACHE_VALID_TIME = 0;
public static class AppAddressPool {
private static ConcurrentMap<String, AppAddressPool> APP_LRU_MANAGER = new ConcurrentHashMap();
public String route(int jobId, List<String> addressList) {
/**
* APP name
*
* @param appname
* @return
*/
public static AppAddressPool getOrCreate(String appname) {
return APP_LRU_MANAGER.computeIfAbsent(appname, key -> new AppAddressPool());
}
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
private List<String> freeAddressList = new ArrayList<>();
/**
* LinkedHashMap
* aaccessOrdertrue=访get/putfalse=
* bremoveEldestEntrytrueLinkedHashMaptrueLRU
*/
private LinkedHashMap<String, String> usedAddressManager = new LinkedHashMap<String, String>(16, 0.75f, true);
private synchronized void clear(String appname) {
APP_LRU_MANAGER.remove(appname);
getOrCreate(appname);
}
// init lru
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
/**
* LinkedHashMap
* aaccessOrdertrue=访get/putfalse=
* bremoveEldestEntrytrueLinkedHashMaptrueLRU
*/
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
public synchronized void updateAddressStats(String address) {
usedAddressManager.put(address, address);
}
// put new
for (String address: addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
private synchronized String route(List<String> addressList) {
LinkedHashMap<String, String> lruItem = usedAddressManager;
// remove dead address
List<String> delKeys = new ArrayList<>();
for (String existKey : lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
if (delKeys.size() > 0) {
for (String delKey : delKeys) {
lruItem.remove(delKey);
}
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
// put new address into free list
freeAddressList.clear();
for (String address : addressList) {
if (!lruItem.containsKey(address)) {
freeAddressList.add(address);
}
}
if (freeAddressList.size() > 0) {
String freeAddress = freeAddressList.get(0);
freeAddressList.remove(0);
return freeAddress;
}
// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
public String route(String appname, List<String> addressList) {
AppAddressPool appAddressPool = AppAddressPool.getOrCreate(appname);
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
appAddressPool.clear(appname);
CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
}
return appAddressPool.route(addressList);
}
private static long CACHE_VALID_TIME = 0;
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
public ReturnT<String> route(XxlJobGroup group, TriggerParam triggerParam, List<String> addressList) {
String address = route(group.getAppname(), addressList);
return new ReturnT<String>(address);
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
throw new RuntimeException("remove");
}
}

@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.route.ExecutorRouterHelper;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
@ -149,9 +150,11 @@ public class XxlJobTrigger {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
routeAddressResult = executorRouteStrategyEnum.getRouter().route(group, triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
//以单个executor为范围更新address的使用统计
ExecutorRouterHelper.updateRouteStats(group.getAppname(), address);
}
}
} else {

@ -0,0 +1,72 @@
package com.xxl.job.admin.core.route;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class ExecutorRouterHelperTest {
@Test
public void test_LFU() throws InterruptedException {
XxlJobGroup group = new XxlJobGroup();
group.setAppname("setAppName");
group.setTitle("setTitle");
group.setAddressType(0);
group.setAddressList("191,192,193,194,195,196");
group.setUpdateTime(new Date());
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(1);
ConcurrentMap<String, AtomicInteger> addressStatsMap = new ConcurrentHashMap<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 0, TimeUnit.HOURS, new LinkedBlockingDeque<>());
for (int i = 0; i < 1000; i++) {
threadPoolExecutor.execute(() -> {
ReturnT<String> route = ExecutorRouteStrategyEnum.LEAST_FREQUENTLY_USED.getRouter().route(group, triggerParam, group.getRegistryList());
if (route.getCode() == ReturnT.SUCCESS_CODE) {
String address = route.getContent();
AtomicInteger atomicInteger = addressStatsMap.computeIfAbsent(address, key -> new AtomicInteger(0));
atomicInteger.incrementAndGet();
//以单个executor为范围更新address的使用统计
ExecutorRouterHelper.updateRouteStats(group.getAppname(), address);
}
});
}
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
System.out.println(addressStatsMap.toString());
}
@Test
public void test_LRU() {
XxlJobGroup group = new XxlJobGroup();
group.setAppname("setAppName");
group.setTitle("setTitle");
group.setAddressType(0);
group.setAddressList("191,192,193,194,195,196");
group.setUpdateTime(new Date());
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(1);
ConcurrentMap<String, AtomicInteger> addressStatsMap = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
ReturnT<String> route = ExecutorRouteStrategyEnum.LEAST_RECENTLY_USED.getRouter().route(group, triggerParam, group.getRegistryList());
if (route.getCode() == ReturnT.SUCCESS_CODE) {
String address = route.getContent();
AtomicInteger atomicInteger = addressStatsMap.computeIfAbsent(address, key -> new AtomicInteger(0));
atomicInteger.incrementAndGet();
//以单个executor为范围更新address的使用统计
ExecutorRouterHelper.updateRouteStats(group.getAppname(), address);
}
}
System.out.println(addressStatsMap.toString());
}
}
Loading…
Cancel
Save