调度中心回调API服务改为restful方式;

2.1.2
xuxueli 5 years ago
parent 9767174c12
commit 2ced601035

@ -1588,10 +1588,11 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 17、DB脚本默认编码改为utf8mb4修复字符乱码问题(建议Mysql版本5.7+)
- 18、调度中心任务平均分配触发组件每次获取与线程池数量相关数量的任务避免大量任务集中在单个调度中心集群节点
- 19、调度中心移除SQL中的 "now()" 函数集群部署时不再依赖DB时钟仅需要保证调度中心应用节点时钟一致即可
- 20、[ING]xxl-rpc服务端线程优化降低线程内存开销
- 21、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表;
- 22、[ING]调度中心日志删除改为分页获取ID根据ID删除的方式
- 23、[ING]任务回调改为restful方式
- 20、xxl-rpc服务端线程优化降低线程内存开销
- 21、调度中心回调API服务改为restful方式
- 22、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表;
- 23、[ING]调度中心日志删除改为分页获取ID根据ID删除的方式
### TODO LIST
@ -1630,6 +1631,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 33、任务执行一次的时候指定IP
- 34、通讯调整双向HTTP回调和其他API自定义AccessTokenRestful执行器复用容器端口
- 35、父子任务参数传递流程任务等透传动态参数
- 36、任务操作API服务调整为和回调服务一致降低接入成本
## 七、其他

@ -24,7 +24,7 @@
<maven.compiler.target>1.7</maven.compiler.target>
<maven.test.skip>true</maven.test.skip>
<xxl-rpc.version>1.4.1</xxl-rpc.version>
<xxl-rpc.version>1.4.2</xxl-rpc.version>
<spring.version>4.3.25.RELEASE</spring.version>
<spring-boot.version>1.5.22.RELEASE</spring-boot.version>

@ -1,34 +1,97 @@
package com.xxl.job.admin.controller;
import com.xxl.job.admin.controller.annotation.PermissionLimit;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.core.biz.AdminBiz;
import org.springframework.beans.factory.InitializingBean;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.ServletException;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
/**
* Created by xuxueli on 17/5/10.
*/
@Controller
public class JobApiController implements InitializingBean {
@RequestMapping("/api")
public class JobApiController {
@Resource
private AdminBiz adminBiz;
@Override
public void afterPropertiesSet() throws Exception {
// ---------------------- admin biz ----------------------
/**
* callback
*
* @param callbackParamList
* @return
*/
@RequestMapping("/callback")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> callback(HttpServletRequest request, @RequestBody List<HandleCallbackParam> callbackParamList) {
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
return adminBiz.callback(callbackParamList);
}
/**
* registry
*
* @param registryParam
* @return
*/
@RequestMapping("/registry")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> registry(HttpServletRequest request, @RequestBody RegistryParam registryParam) {
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
return adminBiz.registry(registryParam);
}
@RequestMapping(AdminBiz.MAPPING)
/**
* registry remove
*
* @param registryParam
* @return
*/
@RequestMapping("/registryRemove")
@ResponseBody
@PermissionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobScheduler.invokeAdminService(request, response);
public ReturnT<String> registryRemove(HttpServletRequest request, @RequestBody RegistryParam registryParam) {
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
return adminBiz.registryRemove(registryParam);
}
// ---------------------- job biz ----------------------
}

@ -5,7 +5,6 @@ import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao;
import com.xxl.job.admin.dao.XxlJobLogDao;
import com.xxl.job.admin.dao.XxlJobRegistryDao;
import com.xxl.job.core.biz.AdminBiz;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
@ -77,8 +76,6 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Resource
private XxlJobGroupDao xxlJobGroupDao;
@Resource
private AdminBiz adminBiz;
@Resource
private JavaMailSender mailSender;
@Resource
private DataSource dataSource;
@ -126,10 +123,6 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
return xxlJobGroupDao;
}
public AdminBiz getAdminBiz() {
return adminBiz;
}
public JavaMailSender getMailSender() {
return mailSender;
}

@ -6,24 +6,16 @@ import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.core.thread.JobScheduleHelper;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.remoting.net.impl.servlet.server.ServletServerHandler;
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient;
import com.xxl.rpc.serialize.impl.HessianSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -39,9 +31,6 @@ public class XxlJobScheduler {
// init i18n
initI18n();
// admin-server
initRpcProvider();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
@ -72,8 +61,6 @@ public class XxlJobScheduler {
// admin registry stop
JobRegistryMonitorHelper.getInstance().toStop();
// admin-server
stopRpcProvider();
}
// ---------------------- I18n ----------------------
@ -84,34 +71,6 @@ public class XxlJobScheduler {
}
}
// ---------------------- admin rpc provider (no server version) ----------------------
private static ServletServerHandler servletServerHandler;
private void initRpcProvider(){
// init
XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
null,
0,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null);
// add services
xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
// servlet handler
servletServerHandler = new ServletServerHandler(xxlRpcProviderFactory);
}
private void stopRpcProvider() throws Exception {
XxlRpcInvokerFactory.getInstance().stop();
}
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
servletServerHandler.handle(null, request, response);
}
// ---------------------- executor-client ----------------------
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
@ -128,18 +87,20 @@ public class XxlJobScheduler {
}
// set-cache
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
address,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null).getObject();
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress(address);
referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken());
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
executorBiz = (ExecutorBiz) referenceBean.getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;

@ -17,6 +17,7 @@ import com.xxl.job.core.handler.IJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.text.MessageFormat;
@ -126,6 +127,14 @@ public class AdminBizImpl implements AdminBiz {
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
@ -138,6 +147,14 @@ public class AdminBizImpl implements AdminBiz {
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryDelete(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret > 0) {

@ -1,18 +1,17 @@
package com.xxl.job.adminbiz;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
/**
* admin api test
*
@ -21,37 +20,38 @@ import org.junit.Test;
public class AdminBizTest {
// admin-client
private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING);
private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin/";
private static String accessToken = null;
@Test
public void callback() throws Exception {
AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken);
HandleCallbackParam param = new HandleCallbackParam();
param.setLogId(1);
param.setExecuteResult(ReturnT.SUCCESS);
List<HandleCallbackParam> callbackParamList = Arrays.asList(param);
ReturnT<String> returnT = adminBiz.callback(callbackParamList);
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
}
/**
* registry executor
*
* @throws Exception
*/
@Test
public void registryTest() throws Exception {
addressUrl = addressUrl.replace("http://", "");
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null).getObject();
// test executor registry
public void registry() throws Exception {
AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken);
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
ReturnT<String> returnT = adminBiz.registry(registryParam);
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
// stop invoker
XxlRpcInvokerFactory.getInstance().stop();
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
}
/**
@ -61,27 +61,13 @@ public class AdminBizTest {
*/
@Test
public void registryRemove() throws Exception {
addressUrl = addressUrl.replace("http://", "");
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null).getObject();
// test executor registry remove
AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken);
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
ReturnT<String> returnT = adminBiz.registryRemove(registryParam);
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
// stop invoker
XxlRpcInvokerFactory.getInstance().stop();
}
}

@ -9,8 +9,8 @@ import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient;
import com.xxl.rpc.serialize.impl.HessianSerializer;
/**
* executor-api client, test
@ -49,18 +49,21 @@ public class ExecutorBizTest {
// do remote trigger
String accessToken = null;
ExecutorBiz executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
"127.0.0.1:9999",
null,
null,
null).getObject();
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress("127.0.0.1:9999");
referenceBean.setAccessToken(null);
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
ExecutorBiz executorBiz = (ExecutorBiz) referenceBean.getObject();
ReturnT<String> runResult = executorBiz.run(triggerParam);

@ -11,8 +11,6 @@ import java.util.List;
*/
public interface AdminBiz {
public static final String MAPPING = "/api";
// ---------------------- callback ----------------------

@ -0,0 +1,48 @@
package com.xxl.job.core.biz.client;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import java.util.List;
/**
* admin api test
*
* @author xuxueli 2017-07-28 22:14:52
*/
public class AdminBizClient implements AdminBiz {
public AdminBizClient() {
}
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList, 3);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3);
}
}

@ -2,6 +2,7 @@ package com.xxl.job.core.executor;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
@ -11,12 +12,10 @@ import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.rpc.registry.ServiceRegistry;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.remoting.net.impl.netty_http.server.NettyHttpServer;
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.serialize.impl.HessianSerializer;
import com.xxl.rpc.util.IpUtil;
import com.xxl.rpc.util.NetUtil;
import org.slf4j.Logger;
@ -105,35 +104,18 @@ public class XxlJobExecutor {
// destory TriggerCallbackThread
TriggerCallbackThread.getInstance().toStop();
// destory invoker
stopInvokerFactory();
}
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private static Serializer serializer;
private static Serializer serializer = new HessianSerializer();
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat(AdminBiz.MAPPING);
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
serializer,
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null
).getObject();
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
@ -143,14 +125,6 @@ public class XxlJobExecutor {
}
}
}
private void stopInvokerFactory(){
// stop invoker factory
try {
XxlRpcInvokerFactory.getInstance().stop();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
@ -171,7 +145,16 @@ public class XxlJobExecutor {
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
xxlRpcProviderFactory.setServer(NettyHttpServer.class);
xxlRpcProviderFactory.setSerializer(HessianSerializer.class);
xxlRpcProviderFactory.setCorePoolSize(30);
xxlRpcProviderFactory.setMaxPoolSize(200);
xxlRpcProviderFactory.setIp(ip);
xxlRpcProviderFactory.setPort(port);
xxlRpcProviderFactory.setAccessToken(accessToken);
xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class);
xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam);
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

@ -0,0 +1,122 @@
package com.xxl.job.core.util;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.registry.client.util.json.BasicJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
/**
* @author xuxueli 2018-11-25 00:55:31
*/
public class XxlJobRemotingUtil {
private static Logger logger = LoggerFactory.getLogger(XxlJobRemotingUtil.class);
public static String XXL_RPC_ACCESS_TOKEN = "XXL_RPC_ACCESS_TOKEN";
/**
* post
*
* @param url
* @param accessToken
* @param requestObj
* @return
*/
public static ReturnT<String> postBody(String url, String accessToken, Object requestObj, int timeout) {
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(timeout * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
if(accessToken!=null && accessToken.trim().length()>0){
connection.setRequestProperty(XXL_RPC_ACCESS_TOKEN, accessToken);
}
// do connection
connection.connect();
// write requestBody
String requestBody = BasicJson.toJson(requestObj);
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.writeBytes(requestBody);
dataOutputStream.flush();
dataOutputStream.close();
/*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
OutputStream outwritestream = connection.getOutputStream();
outwritestream.write(requestBodyBytes);
outwritestream.flush();
outwritestream.close();*/
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String resultJson = result.toString();
// parse returnT
try {
Map<String, Object> resultMap = BasicJson.parseMap(resultJson);
ReturnT<String> returnT = new ReturnT<String>();
if (resultMap==null) {
returnT.setCode(ReturnT.FAIL_CODE);
returnT.setMsg("AdminBizClient Remoting call fail.");
} else {
returnT.setCode(Integer.valueOf(String.valueOf(resultMap.get("code"))));
returnT.setMsg(String.valueOf(resultMap.get("msg")));
returnT.setContent(String.valueOf(resultMap.get("content")));
}
return returnT;
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting response content invalid("+ resultJson +"), for url : " + url);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url);
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
logger.error(e2.getMessage(), e2);
}
}
}
}

@ -10,8 +10,8 @@ import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient;
import com.xxl.rpc.serialize.impl.HessianSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -44,18 +44,20 @@ public class ExecutorBizImplTest {
TimeUnit.SECONDS.sleep(3);
// init executor biz proxy
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
"127.0.0.1:9999",
null,
null,
null).getObject();
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress("127.0.0.1:9999");
referenceBean.setAccessToken(null);
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
executorBiz = (ExecutorBiz) referenceBean.getObject();
}
@After

Loading…
Cancel
Save