路由策略代码重构;

v1.8.2
xuxueli 8 years ago
parent 7e35088764
commit 21b1bf9d26

@ -8,12 +8,10 @@ import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobExecutionContext;
@ -23,7 +21,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
/**
* http job bean
@ -122,99 +122,12 @@ public class RemoteHttpJobBean extends QuartzJobBean {
}
triggerSb.append("<br>路由策略:").append(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle());
// trigger remote executor
if (executorRouteStrategyEnum == ExecutorRouteStrategyEnum.FAILOVER) {
for (String address : addressList) {
// beat
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error("", e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
triggerSb.append("<br>----------------------<br>")
.append("心跳检测:")
.append("<br>address").append(address)
.append("<br>code").append(beatResult.getCode())
.append("<br>msg").append(beatResult.getMsg());
// beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address);
triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
} else if (executorRouteStrategyEnum == ExecutorRouteStrategyEnum.BUSYOVER) {
for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
} catch (Exception e) {
logger.error("", e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
triggerSb.append("<br>----------------------<br>")
.append("空闲检测:")
.append("<br>address").append(address)
.append("<br>code").append(idleBeatResult.getCode())
.append("<br>msg").append(idleBeatResult.getMsg());
// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address);
triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
} else {
// get address
String address = executorRouteStrategyEnum.getRouter().route(jobInfo.getId(), addressList);
jobLog.setExecutorAddress(address);
// run
ReturnT<String> runResult = runExecutor(triggerParam, address);
triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
}
}
/**
* run executor
* @param triggerParam
* @param address
* @return
*/
public ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error("", e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
StringBuffer sb = new StringBuffer("触发调度:");
sb.append("<br>address").append(address);
sb.append("<br>code").append(runResult.getCode());
sb.append("<br>msg").append(runResult.getMsg());
runResult.setMsg(sb.toString());
// route run / trigger remote executor
ReturnT<String> routeRunResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList, jobLog);
triggerSb.append("<br>----------------------<br>").append(routeRunResult.getMsg());
return new ReturnT<String>(routeRunResult.getCode(), triggerSb.toString());
return runResult;
}
}

@ -14,8 +14,8 @@ public enum ExecutorRouteStrategyEnum {
CONSISTENT_HASH("一致性HASH", new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()),
LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()),
FAILOVER("故障转移", null),
BUSYOVER("忙碌转移", null);
FAILOVER("故障转移", new ExecutorRouteFailover()),
BUSYOVER("忙碌转移", new ExecutorRouteBusyover());
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;

@ -1,34 +1,53 @@
package com.xxl.job.admin.core.route;
import org.apache.commons.collections.CollectionUtils;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
/**
* Created by xuxueli on 17/3/10.
*/
public abstract class ExecutorRouter {
public abstract String route(int jobId, ArrayList<String> addressList);
public static String route(int jobId, ArrayList<String> addressList, String executorRouteStrategy){
if (CollectionUtils.isEmpty(addressList)) {
return null;
protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
/**
* route run
*
* @param triggerParam
* @param addressList
* @return
*/
public abstract ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog);
/**
* run executor
* @param triggerParam
* @param address
* @return
*/
protected static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error("", e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
ExecutorRouteStrategyEnum strategy = ExecutorRouteStrategyEnum.match(executorRouteStrategy, ExecutorRouteStrategyEnum.FIRST);
String routeAddress = strategy.getRouter().route(jobId, addressList);
return routeAddress;
}
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
String ret = ExecutorRouter.route(666, new ArrayList<String>(Arrays.asList("127.0.0.1:0000", "127.0.0.1:2222", "127.0.0.1:3333")), ExecutorRouteStrategyEnum.LEAST_FREQUENTLY_USED.name());
System.out.println(ret);
}
StringBuffer runResultSB = new StringBuffer("触发调度:");
runResultSB.append("<br>address").append(address);
runResultSB.append("<br>code").append(runResult.getCode());
runResultSB.append("<br>msg").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
}

@ -0,0 +1,54 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import java.util.ArrayList;
/**
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteBusyover extends ExecutorRouter {
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(0);
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
} catch (Exception e) {
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
idleBeatResultSB.append("<br>----------------------<br>")
.append("空闲检测:")
.append("<br>address").append(address)
.append("<br>code").append(idleBeatResult.getCode())
.append("<br>msg").append(idleBeatResult.getMsg());
// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address);
idleBeatResultSB.append("<br>----------------------<br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), idleBeatResultSB.toString());
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
@ -19,29 +22,6 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter {
private static int VIRTUAL_NODE_NUM = 5;
@Override
public String route(int jobId, ArrayList<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
/**
* get hash code on 2^32 ring (md5hash)
* @param key
@ -77,4 +57,36 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter {
return truncateHashCode;
}
public String route(int jobId, ArrayList<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

@ -0,0 +1,54 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import java.util.ArrayList;
/**
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteFailover extends ExecutorRouter {
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(0);
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
beatResultSB.append("<br>----------------------<br>")
.append("心跳检测:")
.append("<br>address").append(address)
.append("<br>code").append(beatResult.getCode())
.append("<br>msg").append(beatResult.getMsg());
// beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address);
beatResultSB.append("<br>----------------------<br>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), beatResultSB.toString());
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList;
@ -9,9 +12,21 @@ import java.util.ArrayList;
*/
public class ExecutorRouteFirst extends ExecutorRouter {
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(0);
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ -17,7 +20,6 @@ public class ExecutorRouteLFU extends ExecutorRouter {
private static ConcurrentHashMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
private static long CACHE_VALID_TIME = 0;
@Override
public String route(int jobId, ArrayList<String> addressList) {
// cache clear
@ -54,4 +56,18 @@ public class ExecutorRouteLFU extends ExecutorRouter {
return addressItem.getKey();
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@ -18,7 +21,6 @@ public class ExecutorRouteLRU extends ExecutorRouter {
private static ConcurrentHashMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
private static long CACHE_VALID_TIME = 0;
@Override
public String route(int jobId, ArrayList<String> addressList) {
// cache clear
@ -52,4 +54,19 @@ public class ExecutorRouteLRU extends ExecutorRouter {
return eldestValue;
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList;
@ -9,9 +12,20 @@ import java.util.ArrayList;
*/
public class ExecutorRouteLast extends ExecutorRouter {
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(addressList.size()-1);
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList;
import java.util.Random;
@ -12,10 +15,22 @@ public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random();
@Override
public String route(int jobId, ArrayList<String> addressList) {
// Collections.shuffle(addressList);
return addressList.get(localRandom.nextInt(addressList.size()));
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

@ -1,6 +1,9 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList;
import java.util.Random;
@ -27,12 +30,24 @@ public class ExecutorRouteRound extends ExecutorRouter {
return count;
}
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(count(jobId)%addressList.size());
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList, XxlJobLog jobLog) {
return addressList.get(count(jobId)%addressList.size());
}
// address
String address = route(triggerParam.getJobId(), addressList);
jobLog.setExecutorAddress(address);
// run executor
ReturnT<String> runResult = runExecutor(triggerParam, address);
runResult.setMsg("<br>----------------------<br>" + runResult.getMsg());
return runResult;
}
}

Loading…
Cancel
Save