访问令牌(accessToken):为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;

v1.8.2
xuxueli 8 years ago
parent e9566fcbbf
commit 1bc5cc76be

@ -905,6 +905,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 6、调度中心API服务改为自研RPC形式统一底层通讯模型 - 6、调度中心API服务改为自研RPC形式统一底层通讯模型
- 7、新增调度中心API服务测试Demo方便在调度中心API扩展和测试 - 7、新增调度中心API服务测试Demo方便在调度中心API扩展和测试
- 8、任务列表页交互优化更换执行器分组时自动刷新任务列表新建任务时默认定位在当前执行器位置 - 8、任务列表页交互优化更换执行器分组时自动刷新任务列表新建任务时默认定位在当前执行器位置
- 9、访问令牌accessToken为提升系统安全性调度中心和执行器进行安全性校验双方AccessToken匹配才允许通讯
#### TODO LIST #### TODO LIST
- 1、任务权限管理执行器为粒度分配权限核心操作校验权限 - 1、任务权限管理执行器为粒度分配权限核心操作校验权限

@ -3,6 +3,7 @@ package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobInfoDao;
import com.xxl.job.admin.dao.XxlJobLogDao; import com.xxl.job.admin.dao.XxlJobLogDao;
@ -119,7 +120,7 @@ public class JobLogController {
@ResponseBody @ResponseBody
public ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){ public ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){
try { try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, executorAddress).getObject(); ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, executorAddress, XxlJobDynamicScheduler.getAccessToken()).getObject();
ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum); ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);
// is end // is end
@ -153,7 +154,7 @@ public class JobLogController {
// request of kill // request of kill
ReturnT<String> runResult = null; ReturnT<String> runResult = null;
try { try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject(); ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress(), XxlJobDynamicScheduler.getAccessToken()).getObject();
runResult = executorBiz.kill(jobInfo.getId()); runResult = executorBiz.kill(jobInfo.getId());
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);

@ -1,5 +1,6 @@
package com.xxl.job.admin.core.route; package com.xxl.job.admin.core.route;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
@ -33,7 +34,7 @@ public abstract class ExecutorRouter {
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null; ReturnT<String> runResult = null;
try { try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobDynamicScheduler.getAccessToken()).getObject();
runResult = executorBiz.run(triggerParam); runResult = executorBiz.run(triggerParam);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);

@ -1,6 +1,7 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
@ -25,7 +26,7 @@ public class ExecutorRouteBusyover extends ExecutorRouter {
// beat // beat
ReturnT<String> idleBeatResult = null; ReturnT<String> idleBeatResult = null;
try { try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobDynamicScheduler.getAccessToken()).getObject();
idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId()); idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);

@ -1,6 +1,7 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
@ -25,7 +26,7 @@ public class ExecutorRouteFailover extends ExecutorRouter {
// beat // beat
ReturnT<String> beatResult = null; ReturnT<String> beatResult = null;
try { try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobDynamicScheduler.getAccessToken()).getObject();
beatResult = executorBiz.beat(); beatResult = executorBiz.beat();
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);

@ -36,7 +36,16 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
public void setScheduler(Scheduler scheduler) { public void setScheduler(Scheduler scheduler) {
XxlJobDynamicScheduler.scheduler = scheduler; XxlJobDynamicScheduler.scheduler = scheduler;
} }
// accessToken
private static String accessToken;
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public static String getAccessToken() {
return accessToken;
}
// init // init
public void init() throws Exception { public void init() throws Exception {
// admin registry monitor run // admin registry monitor run
@ -47,6 +56,8 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
// rpc-service, base on spring-mvc // rpc-service, base on spring-mvc
NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz); NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
NetComServerFactory.setAccessToken(accessToken);
} }
// destroy // destroy

@ -65,6 +65,7 @@
<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" > <bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
<!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) --> <!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
<property name="scheduler" ref="quartzScheduler"/> <property name="scheduler" ref="quartzScheduler"/>
<property name="accessToken" value="${xxl.job.accessToken}" />
</bean> </bean>
</beans> </beans>

@ -14,4 +14,7 @@ xxl.job.mail.sendNick=《任务调度平台XXL-JOB》
# xxl-job login # xxl-job login
xxl.job.login.username=admin xxl.job.login.username=admin
xxl.job.login.password=123456 xxl.job.login.password=123456
# xxl-job, access token
xxl.job.accessToken=

@ -19,7 +19,8 @@ public class AdminBizTest {
// admin-client // admin-client
String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING); String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING);
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl).getObject(); String accessToken = null;
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
// test executor registry // test executor registry
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999"); RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");

@ -34,7 +34,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
private int port = 9999; private int port = 9999;
private String appName; private String appName;
private String adminAddresses; private String adminAddresses;
public static String logPath; private String accessToken;
public static String logPath = "/data/applogs/xxl-job/jobhandler/";
public void setIp(String ip) { public void setIp(String ip) {
this.ip = ip; this.ip = ip;
@ -48,18 +49,21 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
public void setAdminAddresses(String adminAddresses) { public void setAdminAddresses(String adminAddresses) {
this.adminAddresses = adminAddresses; this.adminAddresses = adminAddresses;
} }
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setLogPath(String logPath) { public void setLogPath(String logPath) {
this.logPath = logPath; this.logPath = logPath;
} }
// ---------------------------------- admin-client ------------------------------------ // ---------------------------------- admin-client ------------------------------------
private static List<AdminBiz> adminBizList; private static List<AdminBiz> adminBizList;
private static void initAdminBizList(String adminAddresses) throws Exception { private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) { if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) { for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) { if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat(AdminBiz.MAPPING); String addressUrl = address.concat(AdminBiz.MAPPING);
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl).getObject(); AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
if (adminBizList == null) { if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>(); adminBizList = new ArrayList<AdminBiz>();
} }
@ -76,12 +80,14 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
private NetComServerFactory serverFactory = new NetComServerFactory(); private NetComServerFactory serverFactory = new NetComServerFactory();
public void start() throws Exception { public void start() throws Exception {
// init admin-client // init admin-client
initAdminBizList(adminAddresses); initAdminBizList(adminAddresses, accessToken);
// executor start // executor start
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty
NetComServerFactory.setAccessToken(accessToken);
serverFactory.start(port, ip, appName); serverFactory.start(port, ip, appName);
// trigger callback thread start // trigger callback thread start
TriggerCallbackThread.getInstance().start(); TriggerCallbackThread.getInstance().start();
} }

@ -12,12 +12,14 @@ public class RpcRequest implements Serializable{
private String serverAddress; private String serverAddress;
private long createMillisTime; private long createMillisTime;
private String accessToken;
private String className; private String className;
private String methodName; private String methodName;
private Class<?>[] parameterTypes; private Class<?>[] parameterTypes;
private Object[] parameters; private Object[] parameters;
public String getServerAddress() { public String getServerAddress() {
return serverAddress; return serverAddress;
} }
@ -29,41 +31,62 @@ public class RpcRequest implements Serializable{
public long getCreateMillisTime() { public long getCreateMillisTime() {
return createMillisTime; return createMillisTime;
} }
public void setCreateMillisTime(long createMillisTime) { public void setCreateMillisTime(long createMillisTime) {
this.createMillisTime = createMillisTime; this.createMillisTime = createMillisTime;
} }
public String getAccessToken() {
return accessToken;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public String getClassName() { public String getClassName() {
return className; return className;
} }
public void setClassName(String className) { public void setClassName(String className) {
this.className = className; this.className = className;
} }
public String getMethodName() { public String getMethodName() {
return methodName; return methodName;
} }
public void setMethodName(String methodName) { public void setMethodName(String methodName) {
this.methodName = methodName; this.methodName = methodName;
} }
public Class<?>[] getParameterTypes() { public Class<?>[] getParameterTypes() {
return parameterTypes; return parameterTypes;
} }
public void setParameterTypes(Class<?>[] parameterTypes) { public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes; this.parameterTypes = parameterTypes;
} }
public Object[] getParameters() { public Object[] getParameters() {
return parameters; return parameters;
} }
public void setParameters(Object[] parameters) { public void setParameters(Object[] parameters) {
this.parameters = parameters; this.parameters = parameters;
} }
@Override @Override
public String toString() { public String toString() {
return "NettyRequest [serverAddress=" + serverAddress + ", createMillisTime=" return "RpcRequest{" +
+ createMillisTime + ", className=" + className "serverAddress='" + serverAddress + '\'' +
+ ", methodName=" + methodName + ", parameterTypes=" ", createMillisTime=" + createMillisTime +
+ Arrays.toString(parameterTypes) + ", parameters=" ", accessToken='" + accessToken + '\'' +
+ Arrays.toString(parameters) + "]"; ", className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", parameters=" + Arrays.toString(parameters) +
'}';
} }
} }

@ -20,11 +20,13 @@ public class NetComClientProxy implements FactoryBean<Object> {
// ---------------------- config ---------------------- // ---------------------- config ----------------------
private Class<?> iface; private Class<?> iface;
String serverAddress; private String serverAddress;
JettyClient client = new JettyClient(); private String accessToken;
public NetComClientProxy(Class<?> iface, String serverAddress) { private JettyClient client = new JettyClient();
public NetComClientProxy(Class<?> iface, String serverAddress, String accessToken) {
this.iface = iface; this.iface = iface;
this.serverAddress = serverAddress; this.serverAddress = serverAddress;
this.accessToken = accessToken;
} }
@Override @Override
@ -39,6 +41,7 @@ public class NetComClientProxy implements FactoryBean<Object> {
RpcRequest request = new RpcRequest(); RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress); request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis()); request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName()); request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName()); request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes()); request.setParameterTypes(method.getParameterTypes());

@ -30,14 +30,18 @@ public class NetComServerFactory {
server.destroy(); server.destroy();
} }
// ---------------------- server init ---------------------- // ---------------------- server instance ----------------------
/** /**
* init local rpc service map * init local rpc service map
*/ */
private static Map<String, Object> serviceMap = new HashMap<String, Object>(); private static Map<String, Object> serviceMap = new HashMap<String, Object>();
private static String accessToken;
public static void putService(Class<?> iface, Object serviceBean){ public static void putService(Class<?> iface, Object serviceBean){
serviceMap.put(iface.getName(), serviceBean); serviceMap.put(iface.getName(), serviceBean);
} }
public static void setAccessToken(String accessToken) {
NetComServerFactory.accessToken = accessToken;
}
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) { public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
if (serviceBean==null) { if (serviceBean==null) {
serviceBean = serviceMap.get(request.getClassName()); serviceBean = serviceMap.get(request.getClassName());
@ -49,7 +53,11 @@ public class NetComServerFactory {
RpcResponse response = new RpcResponse(); RpcResponse response = new RpcResponse();
if (System.currentTimeMillis() - request.getCreateMillisTime() > 180000) { if (System.currentTimeMillis() - request.getCreateMillisTime() > 180000) {
response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "the timestamp difference between admin and executor exceeds the limit.")); response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The timestamp difference between admin and executor exceeds the limit."));
return response;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) {
response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The access token[" + request.getAccessToken() + "] is wrong."));
return response; return response;
} }

@ -33,6 +33,8 @@
<property name="adminAddresses" value="${xxl.job.admin.addresses}" /> <property name="adminAddresses" value="${xxl.job.admin.addresses}" />
<!-- 执行器日志路径[必填] --> <!-- 执行器日志路径[必填] -->
<property name="logPath" value="${xxl.job.executor.logpath}" /> <property name="logPath" value="${xxl.job.executor.logpath}" />
<!-- 访问令牌,非空则进行匹配校验[选填] -->
<property name="accessToken" value="${xxl.job.accessToken}" />
</bean> </bean>

@ -7,4 +7,7 @@ xxl.job.executor.ip=
xxl.job.executor.port=9999 xxl.job.executor.port=9999
### xxl-job log path ### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/ xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/
### xxl-job, access token
xxl.job.accessToken=

@ -33,7 +33,8 @@ public class DemoJobHandlerTest {
triggerParam.setLogDateTim(System.currentTimeMillis()); triggerParam.setLogDateTim(System.currentTimeMillis());
// do remote trigger // do remote trigger
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, "127.0.0.1:9999").getObject(); String accessToken = null;
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, "127.0.0.1:9999", null).getObject();
ReturnT<String> runResult = executorBiz.run(triggerParam); ReturnT<String> runResult = executorBiz.run(triggerParam);
} }

@ -34,6 +34,8 @@ public class XxlJobConfig {
@Value("${xxl.job.executor.logpath}") @Value("${xxl.job.executor.logpath}")
private String logpath; private String logpath;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Bean(initMethod = "start", destroyMethod = "destroy") @Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobExecutor xxlJobExecutor() { public XxlJobExecutor xxlJobExecutor() {
@ -44,6 +46,7 @@ public class XxlJobConfig {
xxlJobExecutor.setAppName(appname); xxlJobExecutor.setAppName(appname);
xxlJobExecutor.setAdminAddresses(addresses); xxlJobExecutor.setAdminAddresses(addresses);
xxlJobExecutor.setLogPath(logpath); xxlJobExecutor.setLogPath(logpath);
xxlJobExecutor.setAccessToken(accessToken);
return xxlJobExecutor; return xxlJobExecutor;
} }

@ -15,3 +15,6 @@ xxl.job.executor.port=9998
### xxl-job log path ### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/ xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/
### xxl-job, access token
xxl.job.accessToken=

Loading…
Cancel
Save