From 2ced6010354bf1ae4cc9aff810f2eee0ae8022a6 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Tue, 19 Nov 2019 20:56:57 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E4=B8=AD=E5=BF=83=E5=9B=9E?= =?UTF-8?q?=E8=B0=83API=E6=9C=8D=E5=8A=A1=E6=94=B9=E4=B8=BArestful?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 10 +- pom.xml | 2 +- .../admin/controller/JobApiController.java | 85 ++++++++++-- .../admin/core/conf/XxlJobAdminConfig.java | 7 - .../admin/core/scheduler/XxlJobScheduler.java | 71 +++------- .../job/admin/service/impl/AdminBizImpl.java | 17 +++ .../com/xxl/job/adminbiz/AdminBizTest.java | 72 +++++------ .../com/xxl/job/executor/ExecutorBizTest.java | 31 +++-- .../java/com/xxl/job/core/biz/AdminBiz.java | 2 - .../job/core/biz/client/AdminBizClient.java | 48 +++++++ .../xxl/job/core/executor/XxlJobExecutor.java | 47 +++---- .../xxl/job/core/util/XxlJobRemotingUtil.java | 122 ++++++++++++++++++ .../core/biz/impl/ExecutorBizImplTest.java | 30 +++-- 13 files changed, 361 insertions(+), 183 deletions(-) create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index e9281bef..8c99c712 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1588,10 +1588,11 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 17、DB脚本默认编码改为utf8mb4,修复字符乱码问题(建议Mysql版本5.7+); - 18、调度中心任务平均分配,触发组件每次获取与线程池数量相关数量的任务,避免大量任务集中在单个调度中心集群节点; - 19、调度中心移除SQL中的 "now()" 函数;集群部署时不再依赖DB时钟,仅需要保证调度中心应用节点时钟一致即可; -- 20、[ING]xxl-rpc服务端线程优化,降低线程内存开销; -- 21、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表; -- 22、[ING]调度中心日志删除,改为分页获取ID,根据ID删除的方式; -- 23、[ING]任务回调改为restful方式; +- 20、xxl-rpc服务端线程优化,降低线程内存开销; +- 21、调度中心回调API服务改为restful方式; +- 22、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表; +- 23、[ING]调度中心日志删除,改为分页获取ID,根据ID删除的方式; + ### TODO LIST @@ -1630,6 +1631,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 33、任务执行一次的时候指定IP; - 34、通讯调整;双向HTTP,回调和其他API自定义AccessToken,Restful,执行器复用容器端口; - 35、父子任务参数传递;流程任务等,透传动态参数; +- 36、任务操作API服务调整为和回调服务一致,降低接入成本; ## 七、其他 diff --git a/pom.xml b/pom.xml index 74530394..8351b0db 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ 1.7 true - 1.4.1 + 1.4.2 4.3.25.RELEASE 1.5.22.RELEASE diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java index 2198395a..7611b810 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java @@ -1,34 +1,97 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.controller.annotation.PermissionLimit; -import com.xxl.job.admin.core.scheduler.XxlJobScheduler; +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.core.biz.AdminBiz; -import org.springframework.beans.factory.InitializingBean; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.util.XxlJobRemotingUtil; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; -import javax.servlet.ServletException; +import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; +import java.util.List; /** * Created by xuxueli on 17/5/10. */ @Controller -public class JobApiController implements InitializingBean { +@RequestMapping("/api") +public class JobApiController { + @Resource + private AdminBiz adminBiz; - @Override - public void afterPropertiesSet() throws Exception { + // ---------------------- admin biz ---------------------- + + /** + * callback + * + * @param callbackParamList + * @return + */ + @RequestMapping("/callback") + @ResponseBody + @PermissionLimit(limit=false) + public ReturnT callback(HttpServletRequest request, @RequestBody List callbackParamList) { + + if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null + && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0 + && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) { + return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); + } + + return adminBiz.callback(callbackParamList); + } + + + + /** + * registry + * + * @param registryParam + * @return + */ + @RequestMapping("/registry") + @ResponseBody + @PermissionLimit(limit=false) + public ReturnT registry(HttpServletRequest request, @RequestBody RegistryParam registryParam) { + + if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null + && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0 + && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) { + return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); + } + + return adminBiz.registry(registryParam); } - @RequestMapping(AdminBiz.MAPPING) + /** + * registry remove + * + * @param registryParam + * @return + */ + @RequestMapping("/registryRemove") + @ResponseBody @PermissionLimit(limit=false) - public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - XxlJobScheduler.invokeAdminService(request, response); + public ReturnT registryRemove(HttpServletRequest request, @RequestBody RegistryParam registryParam) { + + if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null + && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0 + && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) { + return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); + } + + return adminBiz.registryRemove(registryParam); } + // ---------------------- job biz ---------------------- + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java index 8a47912e..e80be920 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java @@ -5,7 +5,6 @@ import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobLogDao; import com.xxl.job.admin.dao.XxlJobRegistryDao; -import com.xxl.job.core.biz.AdminBiz; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; @@ -77,8 +76,6 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Resource private XxlJobGroupDao xxlJobGroupDao; @Resource - private AdminBiz adminBiz; - @Resource private JavaMailSender mailSender; @Resource private DataSource dataSource; @@ -126,10 +123,6 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { return xxlJobGroupDao; } - public AdminBiz getAdminBiz() { - return adminBiz; - } - public JavaMailSender getMailSender() { return mailSender; } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 3409a665..bb2526ea 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -6,24 +6,16 @@ import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; import com.xxl.job.admin.core.thread.JobScheduleHelper; import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.util.I18nUtil; -import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; -import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; import com.xxl.rpc.remoting.invoker.call.CallType; import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.NetEnum; -import com.xxl.rpc.remoting.net.impl.servlet.server.ServletServerHandler; -import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; -import com.xxl.rpc.serialize.Serializer; +import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; +import com.xxl.rpc.serialize.impl.HessianSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,9 +31,6 @@ public class XxlJobScheduler { // init i18n initI18n(); - // admin-server - initRpcProvider(); - // admin registry monitor run JobRegistryMonitorHelper.getInstance().start(); @@ -72,8 +61,6 @@ public class XxlJobScheduler { // admin registry stop JobRegistryMonitorHelper.getInstance().toStop(); - // admin-server - stopRpcProvider(); } // ---------------------- I18n ---------------------- @@ -84,34 +71,6 @@ public class XxlJobScheduler { } } - // ---------------------- admin rpc provider (no server version) ---------------------- - private static ServletServerHandler servletServerHandler; - private void initRpcProvider(){ - // init - XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory(); - xxlRpcProviderFactory.initConfig( - NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), - null, - 0, - XxlJobAdminConfig.getAdminConfig().getAccessToken(), - null, - null); - - // add services - xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz()); - - // servlet handler - servletServerHandler = new ServletServerHandler(xxlRpcProviderFactory); - } - private void stopRpcProvider() throws Exception { - XxlRpcInvokerFactory.getInstance().stop(); - } - public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - servletServerHandler.handle(null, request, response); - } - - // ---------------------- executor-client ---------------------- private static ConcurrentMap executorBizRepository = new ConcurrentHashMap(); public static ExecutorBiz getExecutorBiz(String address) throws Exception { @@ -128,18 +87,20 @@ public class XxlJobScheduler { } // set-cache - executorBiz = (ExecutorBiz) new XxlRpcReferenceBean( - NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), - CallType.SYNC, - LoadBalance.ROUND, - ExecutorBiz.class, - null, - 3000, - address, - XxlJobAdminConfig.getAdminConfig().getAccessToken(), - null, - null).getObject(); + XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); + referenceBean.setClient(NettyHttpClient.class); + referenceBean.setSerializer(HessianSerializer.class); + referenceBean.setCallType(CallType.SYNC); + referenceBean.setLoadBalance(LoadBalance.ROUND); + referenceBean.setIface(ExecutorBiz.class); + referenceBean.setVersion(null); + referenceBean.setTimeout(3000); + referenceBean.setAddress(address); + referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken()); + referenceBean.setInvokeCallback(null); + referenceBean.setInvokerFactory(null); + + executorBiz = (ExecutorBiz) referenceBean.getObject(); executorBizRepository.put(address, executorBiz); return executorBiz; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java index 33d1e52d..88fd2674 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -17,6 +17,7 @@ import com.xxl.job.core.handler.IJobHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.text.MessageFormat; @@ -126,6 +127,14 @@ public class AdminBizImpl implements AdminBiz { @Override public ReturnT registry(RegistryParam registryParam) { + + // valid + if (!StringUtils.hasText(registryParam.getRegistGroup()) + || !StringUtils.hasText(registryParam.getRegistryKey()) + || !StringUtils.hasText(registryParam.getRegistryValue())) { + return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); + } + int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); if (ret < 1) { xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); @@ -138,6 +147,14 @@ public class AdminBizImpl implements AdminBiz { @Override public ReturnT registryRemove(RegistryParam registryParam) { + + // valid + if (!StringUtils.hasText(registryParam.getRegistGroup()) + || !StringUtils.hasText(registryParam.getRegistryKey()) + || !StringUtils.hasText(registryParam.getRegistryValue())) { + return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); + } + int ret = xxlJobRegistryDao.registryDelete(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); if (ret > 0) { diff --git a/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java b/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java index d09333a1..09951e49 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java @@ -1,18 +1,17 @@ package com.xxl.job.adminbiz; import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.client.AdminBizClient; +import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; -import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.xxl.rpc.remoting.invoker.call.CallType; -import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.NetEnum; -import com.xxl.rpc.serialize.Serializer; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + /** * admin api test * @@ -21,37 +20,38 @@ import org.junit.Test; public class AdminBizTest { // admin-client - private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING); + private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin/"; private static String accessToken = null; + + @Test + public void callback() throws Exception { + AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken); + + HandleCallbackParam param = new HandleCallbackParam(); + param.setLogId(1); + param.setExecuteResult(ReturnT.SUCCESS); + + List callbackParamList = Arrays.asList(param); + + ReturnT returnT = adminBiz.callback(callbackParamList); + + Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); + } + /** * registry executor * * @throws Exception */ @Test - public void registryTest() throws Exception { - addressUrl = addressUrl.replace("http://", ""); - AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean( - NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), - CallType.SYNC, - LoadBalance.ROUND, - AdminBiz.class, - null, - 3000, - addressUrl, - accessToken, - null, - null).getObject(); - - // test executor registry + public void registry() throws Exception { + AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken); + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999"); ReturnT returnT = adminBiz.registry(registryParam); - Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); - // stop invoker - XxlRpcInvokerFactory.getInstance().stop(); + Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); } /** @@ -61,27 +61,13 @@ public class AdminBizTest { */ @Test public void registryRemove() throws Exception { - addressUrl = addressUrl.replace("http://", ""); - AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean( - NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), - CallType.SYNC, - LoadBalance.ROUND, - AdminBiz.class, - null, - 3000, - addressUrl, - accessToken, - null, - null).getObject(); - - // test executor registry remove + AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken); + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999"); ReturnT returnT = adminBiz.registryRemove(registryParam); + Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE); - // stop invoker - XxlRpcInvokerFactory.getInstance().stop(); } } diff --git a/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java b/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java index d3914d3e..cc09a9a1 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java @@ -9,8 +9,8 @@ import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; import com.xxl.rpc.remoting.invoker.call.CallType; import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.NetEnum; -import com.xxl.rpc.serialize.Serializer; +import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; +import com.xxl.rpc.serialize.impl.HessianSerializer; /** * executor-api client, test @@ -49,18 +49,21 @@ public class ExecutorBizTest { // do remote trigger String accessToken = null; - ExecutorBiz executorBiz = (ExecutorBiz) new XxlRpcReferenceBean( - NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), - CallType.SYNC, - LoadBalance.ROUND, - ExecutorBiz.class, - null, - 3000, - "127.0.0.1:9999", - null, - null, - null).getObject(); + + XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); + referenceBean.setClient(NettyHttpClient.class); + referenceBean.setSerializer(HessianSerializer.class); + referenceBean.setCallType(CallType.SYNC); + referenceBean.setLoadBalance(LoadBalance.ROUND); + referenceBean.setIface(ExecutorBiz.class); + referenceBean.setVersion(null); + referenceBean.setTimeout(3000); + referenceBean.setAddress("127.0.0.1:9999"); + referenceBean.setAccessToken(null); + referenceBean.setInvokeCallback(null); + referenceBean.setInvokerFactory(null); + + ExecutorBiz executorBiz = (ExecutorBiz) referenceBean.getObject(); ReturnT runResult = executorBiz.run(triggerParam); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java index aeba66ad..4b65ddf3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java @@ -11,8 +11,6 @@ import java.util.List; */ public interface AdminBiz { - public static final String MAPPING = "/api"; - // ---------------------- callback ---------------------- diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java new file mode 100644 index 00000000..88ca224d --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java @@ -0,0 +1,48 @@ +package com.xxl.job.core.biz.client; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.util.XxlJobRemotingUtil; + +import java.util.List; + +/** + * admin api test + * + * @author xuxueli 2017-07-28 22:14:52 + */ +public class AdminBizClient implements AdminBiz { + + public AdminBizClient() { + } + public AdminBizClient(String addressUrl, String accessToken) { + this.addressUrl = addressUrl; + this.accessToken = accessToken; + + // valid + if (!this.addressUrl.endsWith("/")) { + this.addressUrl = this.addressUrl + "/"; + } + } + + private String addressUrl ; + private String accessToken; + + + @Override + public ReturnT callback(List callbackParamList) { + return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList, 3); + } + + @Override + public ReturnT registry(RegistryParam registryParam) { + return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3); + } + + @Override + public ReturnT registryRemove(RegistryParam registryParam) { + return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3); + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index b38a49bc..9a3926cc 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -2,6 +2,7 @@ package com.xxl.job.core.executor; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.client.AdminBizClient; import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; @@ -11,12 +12,10 @@ import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.TriggerCallbackThread; import com.xxl.rpc.registry.ServiceRegistry; import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.xxl.rpc.remoting.invoker.call.CallType; -import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.NetEnum; +import com.xxl.rpc.remoting.net.impl.netty_http.server.NettyHttpServer; import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; import com.xxl.rpc.serialize.Serializer; +import com.xxl.rpc.serialize.impl.HessianSerializer; import com.xxl.rpc.util.IpUtil; import com.xxl.rpc.util.NetUtil; import org.slf4j.Logger; @@ -105,35 +104,18 @@ public class XxlJobExecutor { // destory TriggerCallbackThread TriggerCallbackThread.getInstance().toStop(); - // destory invoker - stopInvokerFactory(); } // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; - private static Serializer serializer; + private static Serializer serializer = new HessianSerializer(); private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { - serializer = Serializer.SerializeEnum.HESSIAN.getSerializer(); if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { - String addressUrl = address.concat(AdminBiz.MAPPING); - - AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean( - NetEnum.NETTY_HTTP, - serializer, - CallType.SYNC, - LoadBalance.ROUND, - AdminBiz.class, - null, - 3000, - addressUrl, - accessToken, - null, - null - ).getObject(); + AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList(); @@ -143,14 +125,6 @@ public class XxlJobExecutor { } } } - private void stopInvokerFactory(){ - // stop invoker factory - try { - XxlRpcInvokerFactory.getInstance().stop(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } public static List getAdminBizList(){ return adminBizList; } @@ -171,7 +145,16 @@ public class XxlJobExecutor { serviceRegistryParam.put("address", address); xxlRpcProviderFactory = new XxlRpcProviderFactory(); - xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam); + + xxlRpcProviderFactory.setServer(NettyHttpServer.class); + xxlRpcProviderFactory.setSerializer(HessianSerializer.class); + xxlRpcProviderFactory.setCorePoolSize(30); + xxlRpcProviderFactory.setMaxPoolSize(200); + xxlRpcProviderFactory.setIp(ip); + xxlRpcProviderFactory.setPort(port); + xxlRpcProviderFactory.setAccessToken(accessToken); + xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class); + xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam); // add services xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java new file mode 100644 index 00000000..258637dc --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java @@ -0,0 +1,122 @@ +package com.xxl.job.core.util; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.registry.client.util.json.BasicJson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; + +/** + * @author xuxueli 2018-11-25 00:55:31 + */ +public class XxlJobRemotingUtil { + private static Logger logger = LoggerFactory.getLogger(XxlJobRemotingUtil.class); + public static String XXL_RPC_ACCESS_TOKEN = "XXL_RPC_ACCESS_TOKEN"; + + /** + * post + * + * @param url + * @param accessToken + * @param requestObj + * @return + */ + public static ReturnT postBody(String url, String accessToken, Object requestObj, int timeout) { + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // connection setting + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(timeout * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + if(accessToken!=null && accessToken.trim().length()>0){ + connection.setRequestProperty(XXL_RPC_ACCESS_TOKEN, accessToken); + } + + // do connection + connection.connect(); + + // write requestBody + String requestBody = BasicJson.toJson(requestObj); + + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.writeBytes(requestBody); + dataOutputStream.flush(); + dataOutputStream.close(); + + /*byte[] requestBodyBytes = requestBody.getBytes("UTF-8"); + connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length)); + OutputStream outwritestream = connection.getOutputStream(); + outwritestream.write(requestBodyBytes); + outwritestream.flush(); + outwritestream.close();*/ + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + return new ReturnT(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream())); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String resultJson = result.toString(); + + // parse returnT + try { + Map resultMap = BasicJson.parseMap(resultJson); + + ReturnT returnT = new ReturnT(); + if (resultMap==null) { + returnT.setCode(ReturnT.FAIL_CODE); + returnT.setMsg("AdminBizClient Remoting call fail."); + } else { + returnT.setCode(Integer.valueOf(String.valueOf(resultMap.get("code")))); + returnT.setMsg(String.valueOf(resultMap.get("msg"))); + returnT.setContent(String.valueOf(resultMap.get("content"))); + } + return returnT; + } catch (Exception e) { + e.printStackTrace(); + return new ReturnT(ReturnT.FAIL_CODE, "xxl-rpc remoting response content invalid("+ resultJson +"), for url : " + url); + } + + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new ReturnT(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url); + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + logger.error(e2.getMessage(), e2); + } + } + } + +} diff --git a/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java b/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java index cde3ddd4..0b3e8c69 100644 --- a/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java +++ b/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java @@ -10,8 +10,8 @@ import com.xxl.job.core.glue.GlueTypeEnum; import com.xxl.rpc.remoting.invoker.call.CallType; import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.NetEnum; -import com.xxl.rpc.serialize.Serializer; +import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; +import com.xxl.rpc.serialize.impl.HessianSerializer; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -44,18 +44,20 @@ public class ExecutorBizImplTest { TimeUnit.SECONDS.sleep(3); // init executor biz proxy - executorBiz = (ExecutorBiz) new XxlRpcReferenceBean( - NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), - CallType.SYNC, - LoadBalance.ROUND, - ExecutorBiz.class, - null, - 3000, - "127.0.0.1:9999", - null, - null, - null).getObject(); + XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); + referenceBean.setClient(NettyHttpClient.class); + referenceBean.setSerializer(HessianSerializer.class); + referenceBean.setCallType(CallType.SYNC); + referenceBean.setLoadBalance(LoadBalance.ROUND); + referenceBean.setIface(ExecutorBiz.class); + referenceBean.setVersion(null); + referenceBean.setTimeout(3000); + referenceBean.setAddress("127.0.0.1:9999"); + referenceBean.setAccessToken(null); + referenceBean.setInvokeCallback(null); + referenceBean.setInvokerFactory(null); + + executorBiz = (ExecutorBiz) referenceBean.getObject(); } @After