1、任务新增"执行器路由策略"属性;

2、路由策略逻辑:第一个、最后一个、轮训、随机、一致性HASH
pull/1/head
xueli.xue 8 years ago
parent 5370a8b3dc
commit 9c03285d94

@ -145,7 +145,7 @@ CREATE TABLE XXL_JOB_QRTZ_LOCKS
CREATE TABLE XXL_JOB_QRTZ_TRIGGER_INFO (
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_INFO` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '任务组(执行器ID)',
`job_name` varchar(255) NOT NULL COMMENT '任务名',
@ -155,6 +155,7 @@ CREATE TABLE XXL_JOB_QRTZ_TRIGGER_INFO (
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`executor_route_strategy` varchar(20) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(255) DEFAULT NULL COMMENT '执行器任务参数',
`glue_switch` int(11) DEFAULT '0' COMMENT 'GLUE模式开关0-否1-是',
@ -164,7 +165,6 @@ CREATE TABLE XXL_JOB_QRTZ_TRIGGER_INFO (
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '任务组',

@ -21,6 +21,7 @@ public class XxlJobInfo {
private String author; // 负责人
private String alarmEmail; // 报警邮件
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器任务Handler名称
private String executorParam; // 执行器,任务参数
@ -105,6 +106,14 @@ public class XxlJobInfo {
this.alarmEmail = alarmEmail;
}
public String getExecutorRouteStrategy() {
return executorRouteStrategy;
}
public void setExecutorRouteStrategy(String executorRouteStrategy) {
this.executorRouteStrategy = executorRouteStrategy;
}
public String getExecutorHandler() {
return executorHandler;
}

@ -0,0 +1,41 @@
package com.xxl.job.admin.core.route;
import com.xxl.job.admin.core.route.strategy.*;
/**
* Created by xuxueli on 17/3/10.
*/
public enum ExecutorRouteStrategyEnum {
FIRST("第一个", new ExecutorRouteFirst()),
LAST("最后一个", new ExecutorRouteLast()),
ROUND("轮询", new ExecutorRouteRound()),
RANDOM("随机", new ExecutorRouteRandom()),
CONSISTENT_HASH("一致性HASH", new ExecutorRouteConsistentHash()),
FAILOVER("故障转移", null);
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;
this.router = router;
}
private String title;
private ExecutorRouter router;
public String getTitle() {
return title;
}
public ExecutorRouter getRouter() {
return router;
}
public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
return defaultItem;
}
}

@ -0,0 +1,51 @@
package com.xxl.job.admin.core.route;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays;
/**
* Created by xuxueli on 17/3/10.
*/
public abstract class ExecutorRouter {
public abstract String route(int jobId, ArrayList<String> addressList);
public static String route(int jobId, ArrayList<String> addressList, String executorRouteStrategy){
if (CollectionUtils.isEmpty(addressList)) {
return null;
}
ExecutorRouteStrategyEnum strategy = ExecutorRouteStrategyEnum.match(executorRouteStrategy, ExecutorRouteStrategyEnum.FIRST);
String routeAddress = strategy.getRouter().route(jobId, addressList);
return routeAddress;
}
public static void main(String[] args) {
int c1 = 0;
int c2 = 0;
int c3 = 0;
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
String ret = ExecutorRouter.route(i, new ArrayList<String>(Arrays.asList("127.0.0.1:0000", "127.0.0.1:2222", "127.0.0.1:3333")), ExecutorRouteStrategyEnum.CONSISTENT_HASH.name());
if (ret.equals("127.0.0.1:0000")) {
c1++;
} else if (ret.equals("127.0.0.1:2222")) {
c2++;
} else if (ret.equals("127.0.0.1:3333")) {
c3++;
}
}
long end = System.currentTimeMillis();
System.out.println(end - start); // 1000*100=740、1000*1=162、
System.out.println(c1);
System.out.println(c2);
System.out.println(c3);
}
}

@ -0,0 +1,80 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* JOBJOB
* avirtual node
* bhash method replace hashCodeStringhashCodehashCode
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteConsistentHash extends ExecutorRouter {
private static int VIRTUAL_NODE_NUM = 5;
@Override
public String route(int jobId, ArrayList<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
/**
* get hash code on 2^32 ring (md5hash)
* @param key
* @return
*/
private static long hash(String key) {
// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes = null;
try {
keyBytes = key.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unknown string :" + key, e);
}
md5.update(keyBytes);
byte[] digest = md5.digest();
// hash code, Truncate to 32-bits
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;
return truncateHashCode;
}
}

@ -0,0 +1,17 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import java.util.ArrayList;
/**
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteFirst extends ExecutorRouter {
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(0);
}
}

@ -0,0 +1,17 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import java.util.ArrayList;
/**
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteLast extends ExecutorRouter {
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(addressList.size()-1);
}
}

@ -0,0 +1,20 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import java.util.ArrayList;
import java.util.Random;
/**
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random();
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(localRandom.nextInt(addressList.size()));
}
}

@ -0,0 +1,26 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by xuxueli on 17/3/10.
*/
public class ExecutorRouteRound extends ExecutorRouter {
private static ConcurrentHashMap<Integer, Integer> routeCountEachJob = new ConcurrentHashMap<Integer, Integer>();
private static int count(int jobId) {
Integer count = routeCountEachJob.get(jobId);
count = (count==null)?0:++count;
routeCountEachJob.put(jobId, count);
return count;
}
@Override
public String route(int jobId, ArrayList<String> addressList) {
return addressList.get(count(jobId)%addressList.size());
}
}

@ -17,6 +17,7 @@
<result column="author" property="author" />
<result column="alarm_email" property="alarmEmail" />
<result column="executor_route_strategy" property="executorRouteStrategy" />
<result column="executor_handler" property="executorHandler" />
<result column="executor_param" property="executorParam" />
@ -37,6 +38,7 @@
t.update_time,
t.author,
t.alarm_email,
t.executor_route_strategy,
t.executor_handler,
t.executor_param,
t.glue_switch,
@ -83,6 +85,7 @@
update_time,
author,
alarm_email,
executor_route_strategy,
executor_handler,
executor_param,
glue_switch,
@ -98,6 +101,7 @@
NOW(),
#{author},
#{alarmEmail},
#{executorRouteStrategy},
#{executorHandler},
#{executorParam},
#{glueSwitch},
@ -125,6 +129,7 @@
update_time = NOW(),
author = #{author},
alarm_email = #{alarmEmail},
executor_route_strategy = #{executorRouteStrategy},
executor_handler = #{executorHandler},
executor_param = #{executorParam},
glue_switch = #{glueSwitch},

Loading…
Cancel
Save