From 7ad75c6c0a1403088303a14e4381f51b96074201 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Sat, 11 Apr 2020 23:55:35 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E4=B8=AD=E5=BF=83=E4=B8=8E?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E9=80=9A=E8=AE=AF=E8=A7=84=E8=8C=83?= =?UTF-8?q?=E4=B8=BA=E5=8F=8C=E5=90=91restful=EF=BC=8C=E6=96=B9=E4=BE=BF?= =?UTF-8?q?=E8=B7=A8=E8=AF=AD=E8=A8=80=EF=BC=8C=E4=BB=A5=E5=8F=8A=E7=AC=AC?= =?UTF-8?q?=E4=B8=89=E6=96=B9=E6=89=A7=E8=A1=8C=E5=99=A8=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=EF=BC=9B=E9=80=9A=E8=AE=AF=E7=BB=84=E4=BB=B6xxl-rpc=E6=96=B9?= =?UTF-8?q?=E6=A1=88=E8=B0=83=E6=95=B4=E4=B8=BAJetty+Gson=E6=96=B9?= =?UTF-8?q?=E6=A1=88=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 2 +- .../admin/controller/JobApiController.java | 26 +- .../admin/controller/JobLogController.java | 5 +- .../admin/core/scheduler/XxlJobScheduler.java | 21 +- .../job/admin/core/trigger/XxlJobTrigger.java | 6 +- .../resources/static/js/jobgroup.index.1.js | 6 +- .../com/xxl/job/executor/ExecutorBizTest.java | 118 ++++++--- xxl-job-core/pom.xml | 27 +- .../com/xxl/job/core/biz/ExecutorBiz.java | 7 +- .../job/core/biz/client/AdminBizClient.java | 8 +- .../core/biz/client/ExecutorBizClient.java | 57 +++++ .../job/core/biz/impl/ExecutorBizImpl.java | 7 +- .../com/xxl/job/core/biz/model/LogParam.java | 45 ++++ .../xxl/job/core/executor/XxlJobExecutor.java | 99 ++----- .../com/xxl/job/core/server/EmbedServer.java | 241 ++++++++++++++++++ .../core/thread/TriggerCallbackThread.java | 9 +- .../java/com/xxl/job/core/util/FileUtil.java | 10 +- .../java/com/xxl/job/core/util/GsonTool.java | 89 +++++++ .../java/com/xxl/job/core/util/IpUtil.java | 203 +++++++++++++++ .../java/com/xxl/job/core/util/NetUtil.java | 70 +++++ .../xxl/job/core/util/XxlJobRemotingUtil.java | 33 +-- .../core/biz/impl/ExecutorBizImplTest.java | 145 ----------- 22 files changed, 893 insertions(+), 341 deletions(-) create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogParam.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/GsonTool.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/NetUtil.java delete mode 100644 xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 7f5e1c66..d330108e 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1758,7 +1758,7 @@ data: post-data - 20、任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; - 21、执行备注消息长度限制,修复数据超长无法存储导致导致回调失败的问题; - 22、一致性哈希路由策略优化:默认虚拟节点数量调整为100,提高路由的均衡性; -- 23、[迭代中]调度中心与执行器通讯规范为双向restful,方便跨语言,以及第三方执行器实现;通讯组件xxl-rpc方案调整为Jetty+Gson方案; +- 23、调度中心与执行器通讯规范为双向restful,方便跨语言,以及第三方执行器实现;通讯组件xxl-rpc方案调整为Jetty+Gson方案; ### TODO LIST diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java index ad1244a8..38b41863 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java @@ -3,11 +3,11 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.controller.annotation.PermissionLimit; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.exception.XxlJobException; -import com.xxl.job.admin.core.util.JacksonUtil; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.util.GsonTool; import com.xxl.job.core.util.XxlJobRemotingUtil; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; @@ -37,7 +37,7 @@ public class JobApiController { private void validAccessToken(HttpServletRequest request){ if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0 - && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) { + && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) { throw new XxlJobException("The access token is wrong."); } } @@ -45,10 +45,24 @@ public class JobApiController { /** * parse Param */ - private Object parseParam(String data, Class parametrized, Class... parameterClasses){ + private Object parseParam(String data, Class parametrized, Class parameterClasses){ Object param = null; try { if (parameterClasses != null) { + param = GsonTool.fromJson(data, parametrized, parameterClasses); + } else { + param = GsonTool.fromJson(data, parametrized); + } + } catch (Exception e) { } + if (param==null) { + throw new XxlJobException("The request data invalid."); + } + return param; + } + /*private Object parseParam(String data, Class parametrized, Class... parameterClasses){ + Object param = null; + try { + if (parameterClasses!=null && parameterClasses.length>0) { param = JacksonUtil.readValue(data, parametrized, parameterClasses); } else { param = JacksonUtil.readValue(data, parametrized); @@ -58,7 +72,7 @@ public class JobApiController { throw new XxlJobException("The request data invalid."); } return param; - } + }*/ // ---------------------- admin biz ---------------------- @@ -98,7 +112,7 @@ public class JobApiController { validAccessToken(request); // param - RegistryParam registryParam = (RegistryParam) parseParam(data, RegistryParam.class); + RegistryParam registryParam = (RegistryParam) parseParam(data, RegistryParam.class, null); // invoke return adminBiz.registry(registryParam); @@ -118,7 +132,7 @@ public class JobApiController { validAccessToken(request); // param - RegistryParam registryParam = (RegistryParam) parseParam(data, RegistryParam.class); + RegistryParam registryParam = (RegistryParam) parseParam(data, RegistryParam.class, null); // invoke return adminBiz.registryRemove(registryParam); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index d378aac7..c8bffc7d 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,15 +1,16 @@ package com.xxl.job.admin.controller; -import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.core.exception.XxlJobException; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobLogDao; import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.LogParam; import com.xxl.job.core.biz.model.LogResult; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.util.DateUtil; @@ -137,7 +138,7 @@ public class JobLogController { public ReturnT logDetailCat(String executorAddress, long triggerTime, long logId, int fromLineNum){ try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(executorAddress); - ReturnT logResult = executorBiz.log(triggerTime, logId, fromLineNum); + ReturnT logResult = executorBiz.log(new LogParam(triggerTime, logId, fromLineNum)); // is end if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index 2e01fb3b..6dc6b6c2 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -4,12 +4,8 @@ import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.thread.*; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.client.ExecutorBizClient; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; -import com.xxl.rpc.remoting.invoker.call.CallType; -import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; -import com.xxl.rpc.serialize.impl.HessianSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,20 +92,7 @@ public class XxlJobScheduler { } // set-cache - XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); - referenceBean.setClient(NettyHttpClient.class); - referenceBean.setSerializer(HessianSerializer.class); - referenceBean.setCallType(CallType.SYNC); - referenceBean.setLoadBalance(LoadBalance.ROUND); - referenceBean.setIface(ExecutorBiz.class); - referenceBean.setVersion(null); - referenceBean.setTimeout(3000); - referenceBean.setAddress(address); - referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken()); - referenceBean.setInvokeCallback(null); - referenceBean.setInvokerFactory(null); - - executorBiz = (ExecutorBiz) referenceBean.getObject(); + executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken()); executorBizRepository.put(address, executorBiz); return executorBiz; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 321ea5d3..6b484059 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -11,8 +11,8 @@ import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; -import com.xxl.rpc.util.IpUtil; -import com.xxl.rpc.util.ThrowableUtil; +import com.xxl.job.core.util.IpUtil; +import io.netty.util.internal.ThrowableUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -211,7 +211,7 @@ public class XxlJobTrigger { runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); - runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); + runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.stackTraceToString(e)); } StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":"); diff --git a/xxl-job-admin/src/main/resources/static/js/jobgroup.index.1.js b/xxl-job-admin/src/main/resources/static/js/jobgroup.index.1.js index 3f54d334..32f36b55 100644 --- a/xxl-job-admin/src/main/resources/static/js/jobgroup.index.1.js +++ b/xxl-job-admin/src/main/resources/static/js/jobgroup.index.1.js @@ -123,13 +123,13 @@ $(function() { var id = $(this).attr("_id"); var row = tableData['key'+id]; - var html = '
'; + var html = '
'; if (row.registryList) { for (var index in row.registryList) { - html += '' + row.registryList[index] + '
'; + html += (parseInt(index)+1) + '. ' + row.registryList[index] + '
'; } } - html += '
'; + html += ''; layer.open({ title: I18n.jobinfo_opt_registryinfo , diff --git a/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java b/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java index 93c1381d..5c587e43 100644 --- a/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java +++ b/xxl-job-admin/src/test/java/com/xxl/job/executor/ExecutorBizTest.java @@ -1,16 +1,15 @@ package com.xxl.job.executor; import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.client.ExecutorBizClient; +import com.xxl.job.core.biz.model.LogParam; +import com.xxl.job.core.biz.model.LogResult; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.glue.GlueTypeEnum; -import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; -import com.xxl.rpc.remoting.invoker.call.CallType; -import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; -import com.xxl.rpc.serialize.impl.HessianSerializer; +import org.junit.Assert; +import org.junit.Test; /** * executor-api client, test @@ -19,27 +18,79 @@ import com.xxl.rpc.serialize.impl.HessianSerializer; */ public class ExecutorBizTest { - public static void main(String[] args) throws Exception { + // admin-client + private static String addressUrl = "http://127.0.0.1:8081/"; + private static String accessToken = null; - // param - String jobHandler = "demoJobHandler"; - String params = ""; + @Test + public void beat() throws Exception { + ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken); + // Act + final ReturnT retval = executorBiz.beat(); - runTest(jobHandler, params); + // Assert result + Assert.assertNotNull(retval); + Assert.assertNull(((ReturnT) retval).getContent()); + Assert.assertEquals(200, retval.getCode()); + Assert.assertNull(retval.getMsg()); } - /** - * run jobhandler - * - * @param jobHandler - * @param params - */ - private static void runTest(String jobHandler, String params) throws Exception { + @Test + public void idleBeat(){ + ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken); + + final int jobId = 0; + + // Act + final ReturnT retval = executorBiz.idleBeat(jobId); + + // Assert result + Assert.assertNotNull(retval); + Assert.assertNull(((ReturnT) retval).getContent()); + Assert.assertEquals(500, retval.getCode()); + Assert.assertEquals("job thread is running or has trigger queue.", retval.getMsg()); + } + + @Test + public void kill(){ + ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken); + + final int jobId = 0; + + // Act + final ReturnT retval = executorBiz.kill(jobId); + + // Assert result + Assert.assertNotNull(retval); + Assert.assertNull(((ReturnT) retval).getContent()); + Assert.assertEquals(200, retval.getCode()); + Assert.assertNull(retval.getMsg()); + } + + @Test + public void log(){ + ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken); + + final long logDateTim = 0L; + final long logId = 0; + final int fromLineNum = 0; + + // Act + final ReturnT retval = executorBiz.log(new LogParam(logDateTim, logId, fromLineNum)); + + // Assert result + Assert.assertNotNull(retval); + } + + @Test + public void run(){ + ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken); + // trigger data - TriggerParam triggerParam = new TriggerParam(); + final TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(1); - triggerParam.setExecutorHandler(jobHandler); - triggerParam.setExecutorParams(params); + triggerParam.setExecutorHandler("demoJobHandler"); + triggerParam.setExecutorParams(null); triggerParam.setExecutorBlockStrategy(ExecutorBlockStrategyEnum.COVER_EARLY.name()); triggerParam.setGlueType(GlueTypeEnum.BEAN.name()); triggerParam.setGlueSource(null); @@ -47,28 +98,11 @@ public class ExecutorBizTest { triggerParam.setLogId(1); triggerParam.setLogDateTime(System.currentTimeMillis()); - // do remote trigger - String accessToken = null; - - XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); - referenceBean.setClient(NettyHttpClient.class); - referenceBean.setSerializer(HessianSerializer.class); - referenceBean.setCallType(CallType.SYNC); - referenceBean.setLoadBalance(LoadBalance.ROUND); - referenceBean.setIface(ExecutorBiz.class); - referenceBean.setVersion(null); - referenceBean.setTimeout(3000); - referenceBean.setAddress("127.0.0.1:9999"); - referenceBean.setAccessToken(accessToken); - referenceBean.setInvokeCallback(null); - referenceBean.setInvokerFactory(null); - - ExecutorBiz executorBiz = (ExecutorBiz) referenceBean.getObject(); - - ReturnT runResult = executorBiz.run(triggerParam); + // Act + final ReturnT retval = executorBiz.run(triggerParam); - System.out.println(runResult); - XxlRpcInvokerFactory.getInstance().stop(); + // Assert result + Assert.assertNotNull(retval); } } diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml index 14559baa..765a1fe0 100644 --- a/xxl-job-core/pom.xml +++ b/xxl-job-core/pom.xml @@ -15,13 +15,19 @@ - + - com.xuxueli - xxl-rpc-core - ${xxl-rpc.version} + io.netty + netty-all + 4.1.48.Final + + + com.google.code.gson + gson + 2.8.6 + org.codehaus.groovy @@ -37,6 +43,19 @@ provided + + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + org.slf4j + slf4j-log4j12 + ${slf4j-api.version} + test + junit diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java index deca9f20..99b9ded5 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java @@ -1,5 +1,6 @@ package com.xxl.job.core.biz; +import com.xxl.job.core.biz.model.LogParam; import com.xxl.job.core.biz.model.LogResult; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; @@ -32,12 +33,10 @@ public interface ExecutorBiz { /** * log - * @param logDateTim - * @param logId - * @param fromLineNum + * @param logParam * @return */ - public ReturnT log(long logDateTim, long logId, int fromLineNum); + public ReturnT log(LogParam logParam); /** * run diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java index 88ca224d..95fa5608 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/AdminBizClient.java @@ -29,20 +29,22 @@ public class AdminBizClient implements AdminBiz { private String addressUrl ; private String accessToken; + private int timeout = 3; @Override public ReturnT callback(List callbackParamList) { - return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList, 3); + return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); } @Override public ReturnT registry(RegistryParam registryParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3); + return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class); } @Override public ReturnT registryRemove(RegistryParam registryParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3); + return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class); } + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java new file mode 100644 index 00000000..857a8555 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java @@ -0,0 +1,57 @@ +package com.xxl.job.core.biz.client; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.LogParam; +import com.xxl.job.core.biz.model.LogResult; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.util.XxlJobRemotingUtil; + +/** + * admin api test + * + * @author xuxueli 2017-07-28 22:14:52 + */ +public class ExecutorBizClient implements ExecutorBiz { + + public ExecutorBizClient() { + } + public ExecutorBizClient(String addressUrl, String accessToken) { + this.addressUrl = addressUrl; + this.accessToken = accessToken; + + // valid + if (!this.addressUrl.endsWith("/")) { + this.addressUrl = this.addressUrl + "/"; + } + } + + private String addressUrl ; + private String accessToken; + private int timeout = 3; + + + @Override + public ReturnT beat() { + return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, null, String.class); + } + + public ReturnT idleBeat(int jobId){ + return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, jobId, String.class); + } + + @Override + public ReturnT kill(int jobId) { + return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, jobId, String.class); + } + + @Override + public ReturnT log(LogParam logParam) { + return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class); + } + + public ReturnT run(TriggerParam triggerParam) { + return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 1c809602..96230f7c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -1,6 +1,7 @@ package com.xxl.job.core.biz.impl; import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.LogParam; import com.xxl.job.core.biz.model.LogResult; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; @@ -58,11 +59,11 @@ public class ExecutorBizImpl implements ExecutorBiz { } @Override - public ReturnT log(long logDateTim, long logId, int fromLineNum) { + public ReturnT log(LogParam logParam) { // log filename: logPath/yyyy-MM-dd/9999.log - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId); + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId()); - LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum); + LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum()); return new ReturnT(logResult); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogParam.java new file mode 100644 index 00000000..7bee48eb --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/LogParam.java @@ -0,0 +1,45 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * @author xuxueli 2020-04-11 22:27 + */ +public class LogParam implements Serializable { + private static final long serialVersionUID = 42L; + + public LogParam(long logDateTim, long logId, int fromLineNum) { + this.logDateTim = logDateTim; + this.logId = logId; + this.fromLineNum = fromLineNum; + } + + private long logDateTim; + private long logId; + private int fromLineNum; + + public long getLogDateTim() { + return logDateTim; + } + + public void setLogDateTim(long logDateTim) { + this.logDateTim = logDateTim; + } + + public long getLogId() { + return logId; + } + + public void setLogId(long logId) { + this.logId = logId; + } + + public int getFromLineNum() { + return fromLineNum; + } + + public void setFromLineNum(int fromLineNum) { + this.fromLineNum = fromLineNum; + } + +} \ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 7684a550..bf904064 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -1,26 +1,21 @@ package com.xxl.job.core.executor; import com.xxl.job.core.biz.AdminBiz; -import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.client.AdminBizClient; -import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.job.core.thread.ExecutorRegistryThread; +import com.xxl.job.core.server.EmbedServer; import com.xxl.job.core.thread.JobLogFileCleanThread; import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.TriggerCallbackThread; -import com.xxl.rpc.registry.ServiceRegistry; -import com.xxl.rpc.remoting.net.impl.netty_http.server.NettyHttpServer; -import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; -import com.xxl.rpc.serialize.Serializer; -import com.xxl.rpc.serialize.impl.HessianSerializer; -import com.xxl.rpc.util.IpUtil; -import com.xxl.rpc.util.NetUtil; +import com.xxl.job.core.util.IpUtil; +import com.xxl.job.core.util.NetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -83,13 +78,11 @@ public class XxlJobExecutor { TriggerCallbackThread.getInstance().start(); // init executor-server - port = port>0?port: NetUtil.findAvailablePort(9999); - ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); - initRpcProvider(address, ip, port, appName, accessToken); + initEmbedServer(address, ip, port, appName, accessToken); } public void destroy(){ // destory executor-server - stopRpcProvider(); + stopEmbedServer(); // destory jobThreadRepository if (jobThreadRepository.size() > 0) { @@ -120,7 +113,6 @@ public class XxlJobExecutor { // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; - private static Serializer serializer = new HessianSerializer(); private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { @@ -139,80 +131,31 @@ public class XxlJobExecutor { public static List getAdminBizList(){ return adminBizList; } - public static Serializer getSerializer() { - return serializer; - } - // ---------------------- executor-server (rpc provider) ---------------------- - private XxlRpcProviderFactory xxlRpcProviderFactory = null; + private EmbedServer embedServer = null; - private void initRpcProvider(String address, String ip, int port, String appName, String accessToken) throws Exception { + private void initEmbedServer(String address, String ip, int port, String appName, String accessToken) throws Exception { - // init, provider factory + // fill ip port + port = port>0?port: NetUtil.findAvailablePort(9999); + ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); + + // generate address if (address==null || address.trim().length()==0) { - address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null + String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null + address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } - Map serviceRegistryParam = new HashMap(); - serviceRegistryParam.put("appName", appName); - serviceRegistryParam.put("address", address); - - xxlRpcProviderFactory = new XxlRpcProviderFactory(); - - xxlRpcProviderFactory.setServer(NettyHttpServer.class); - xxlRpcProviderFactory.setSerializer(HessianSerializer.class); - xxlRpcProviderFactory.setCorePoolSize(20); - xxlRpcProviderFactory.setMaxPoolSize(200); - xxlRpcProviderFactory.setIp(ip); - xxlRpcProviderFactory.setPort(port); - xxlRpcProviderFactory.setAccessToken(accessToken); - xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class); - xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam); - - // add services - xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); // start - xxlRpcProviderFactory.start(); - - } - - public static class ExecutorServiceRegistry extends ServiceRegistry { - - @Override - public void start(Map param) { - // start registry - ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address")); - } - @Override - public void stop() { - // stop registry - ExecutorRegistryThread.getInstance().toStop(); - } - - @Override - public boolean registry(Set keys, String value) { - return false; - } - @Override - public boolean remove(Set keys, String value) { - return false; - } - @Override - public Map> discovery(Set keys) { - return null; - } - @Override - public TreeSet discovery(String key) { - return null; - } - + embedServer = new EmbedServer(); + embedServer.start(address, port, appName, accessToken); } - private void stopRpcProvider() { + private void stopEmbedServer() { // stop provider factory try { - xxlRpcProviderFactory.stop(); + embedServer.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java new file mode 100644 index 00000000..05d1f3f0 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -0,0 +1,241 @@ +package com.xxl.job.core.server; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.impl.ExecutorBizImpl; +import com.xxl.job.core.biz.model.LogParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.thread.ExecutorRegistryThread; +import com.xxl.job.core.util.GsonTool; +import com.xxl.job.core.util.XxlJobRemotingUtil; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.internal.ThrowableUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Copy from : https://github.com/xuxueli/xxl-rpc + * + * @author xuxueli 2020-04-11 21:25 + */ +public class EmbedServer { + private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class); + + private ExecutorBiz executorBiz; + private Thread thread; + + public void start(final String address, final int port, final String appName, final String accessToken) { + executorBiz = new ExecutorBizImpl(); + thread = new Thread(new Runnable() { + + @Override + public void run() { + + // param + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + // start server + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle + .addLast(new HttpServerCodec()) + .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL + .addLast(new EmbedHttpServerHandler(executorBiz, accessToken)); + } + }) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // bind + ChannelFuture future = bootstrap.bind(port).sync(); + + logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); + + // start registry + startRegistry(appName, address); + + // wait util stop + future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + if (e instanceof InterruptedException) { + logger.info(">>>>>>>>>>> xxl-job remoting server stop."); + } else { + logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); + } + } finally { + // stop + try { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + } + + }); + thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave + thread.start(); + } + + public void stop() throws Exception { + // destroy server thread + if (thread!=null && thread.isAlive()) { + thread.interrupt(); + } + + // stop registry + stopRegistry(); + logger.info(">>>>>>>>>>> xxl-job remoting server destroy success."); + } + + + // ---------------------- registry ---------------------- + + /** + * netty_http + * + * Copy from : https://github.com/xuxueli/xxl-rpc + * + * @author xuxueli 2015-11-24 22:25:15 + */ + public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class); + + private ExecutorBiz executorBiz; + private String accessToken; + public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken) { + this.executorBiz = executorBiz; + this.accessToken = accessToken; + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + + // request parse + //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); + String requestData = msg.content().toString(CharsetUtil.UTF_8); + String uri = msg.uri(); + HttpMethod httpMethod = msg.method(); + boolean keepAlive = HttpUtil.isKeepAlive(msg); + String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); + + // do invoke + Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); + + // to json + String responseJson = GsonTool.toJson(responseObj); + + // write response + writeResponse(ctx, keepAlive, responseJson); + } + + private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { + + // valid + if (HttpMethod.POST != httpMethod) { + return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); + } + if (uri==null || uri.trim().length()==0) { + return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty."); + } + if (accessToken!=null + && accessToken.trim().length()>0 + && !accessToken.equals(accessTokenReq)) { + return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); + } + + // services mapping + try { + if ("/beat".equals(uri)) { + return executorBiz.beat(); + } else if ("/idleBeat".equals(uri)) { + int jobId = GsonTool.fromJson(requestData, Integer.class); + return executorBiz.idleBeat(jobId); + } else if ("/kill".equals(uri)) { + int jobId = GsonTool.fromJson(requestData, Integer.class); + return executorBiz.kill(jobId); + } else if ("/log".equals(uri)) { + LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); + return executorBiz.log(logParam); + } else if ("/run".equals(uri)) { + TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); + return executorBiz.run(triggerParam); + } else { + return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new ReturnT(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.stackTraceToString(e)); + } + } + + /** + * write response + */ + private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { + // write response + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString() + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } + ctx.writeAndFlush(response); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); // beat 3N, close if idle + logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel."); + } else { + super.userEventTriggered(ctx, evt); + } + } + } + + // ---------------------- registry ---------------------- + + public void startRegistry(final String appName, final String address) { + // start registry + ExecutorRegistryThread.getInstance().start(appName, address); + } + + public void stopRegistry() { + // stop registry + ExecutorRegistryThread.getInstance().toStop(); + } + + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index d4f784e8..03e91c4c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -8,6 +8,7 @@ import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.util.FileUtil; +import com.xxl.job.core.util.GsonTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,7 +205,7 @@ public class TriggerCallbackThread { } // append file - byte[] callbackParamList_bytes = XxlJobExecutor.getSerializer().serialize(callbackParamList); + String callbackParamList_Str = GsonTool.toJson(callbackParamList); File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); if (callbackLogFile.exists()) { @@ -215,7 +216,7 @@ public class TriggerCallbackThread { } } } - FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); + FileUtil.writeFileContent(callbackLogFile, callbackParamList_Str); } private void retryFailCallbackFile(){ @@ -234,8 +235,8 @@ public class TriggerCallbackThread { // load and clear file, retry for (File callbaclLogFile: callbackLogPath.listFiles()) { - byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); - List callbackParamList = (List) XxlJobExecutor.getSerializer().deserialize(callbackParamList_bytes, HandleCallbackParam.class); + String callbackParamList_str = FileUtil.readFileContent(callbaclLogFile); + List callbackParamList = GsonTool.fromJsonList(callbackParamList_str, HandleCallbackParam.class); callbaclLogFile.delete(); doCallback(callbackParamList); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java index 9d27af7e..fad5347f 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java @@ -48,7 +48,7 @@ public class FileUtil { } - public static void writeFileContent(File file, byte[] data) { + public static void writeFileContent(File file, String data) { // file if (!file.exists()) { @@ -59,7 +59,7 @@ public class FileUtil { FileOutputStream fos = null; try { fos = new FileOutputStream(file); - fos.write(data); + fos.write(data.getBytes("utf-8")); fos.flush(); } catch (Exception e) { logger.error(e.getMessage(), e); @@ -75,7 +75,7 @@ public class FileUtil { } - public static byte[] readFileContent(File file) { + public static String readFileContent(File file) { Long filelength = file.length(); byte[] filecontent = new byte[filelength.intValue()]; @@ -84,8 +84,11 @@ public class FileUtil { in = new FileInputStream(file); in.read(filecontent); in.close(); + + return new String(filecontent, "utf-8"); } catch (Exception e) { logger.error(e.getMessage(), e); + return null; } finally { if (in != null) { try { @@ -95,7 +98,6 @@ public class FileUtil { } } } - return filecontent; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/GsonTool.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/GsonTool.java new file mode 100644 index 00000000..a6f3ca15 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/GsonTool.java @@ -0,0 +1,89 @@ +package com.xxl.job.core.util; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.List; + +/** + * @author xuxueli 2020-04-11 20:56:31 + */ +public class GsonTool { + + private static Gson gson = null; + static { + gson= new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); + } + + /** + * Object 转成 json + * + * @param src + * @return String + */ + public static String toJson(Object src) { + return gson.toJson(src); + } + + /** + * json 转成 特定的cls的Object + * + * @param json + * @param classOfT + * @return + */ + public static T fromJson(String json, Class classOfT) { + return gson.fromJson(json, classOfT); + } + + /** + * json 转成 特定的 rawClass 的Object + * + * @param json + * @param classOfT + * @param argClassOfT + * @return + */ + public static T fromJson(String json, Class classOfT, Class argClassOfT) { + Type type = new ParameterizedType4ReturnT(classOfT, new Class[]{argClassOfT}); + return gson.fromJson(json, type); + } + public static class ParameterizedType4ReturnT implements ParameterizedType { + private final Class raw; + private final Type[] args; + public ParameterizedType4ReturnT(Class raw, Type[] args) { + this.raw = raw; + this.args = args != null ? args : new Type[0]; + } + @Override + public Type[] getActualTypeArguments() { + return args; + } + @Override + public Type getRawType() { + return raw; + } + @Override + public Type getOwnerType() {return null;} + } + + /** + * json 转成 特定的cls的list + * + * @param json + * @param classOfT + * @return + */ + public static List fromJsonList(String json, Class classOfT) { + return gson.fromJson( + json, + new TypeToken>() { + }.getType() + ); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java new file mode 100644 index 00000000..c97c4fea --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java @@ -0,0 +1,203 @@ +package com.xxl.job.core.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.Enumeration; +import java.util.regex.Pattern; + +/** + * ip tool + * + * @author xuxueli 2016-5-22 11:38:05 + */ +public class IpUtil { + private static final Logger logger = LoggerFactory.getLogger(IpUtil.class); + + private static final String ANYHOST_VALUE = "0.0.0.0"; + private static final String LOCALHOST_VALUE = "127.0.0.1"; + private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$"); + + + + private static volatile InetAddress LOCAL_ADDRESS = null; + + // ---------------------- valid ---------------------- + + private static InetAddress toValidAddress(InetAddress address) { + if (address instanceof Inet6Address) { + Inet6Address v6Address = (Inet6Address) address; + if (isPreferIPV6Address()) { + return normalizeV6Address(v6Address); + } + } + if (isValidV4Address(address)) { + return address; + } + return null; + } + + private static boolean isPreferIPV6Address() { + return Boolean.getBoolean("java.net.preferIPv6Addresses"); + } + + /** + * valid Inet4Address + * + * @param address + * @return + */ + private static boolean isValidV4Address(InetAddress address) { + if (address == null || address.isLoopbackAddress()) { + return false; + } + String name = address.getHostAddress(); + boolean result = (name != null + && IP_PATTERN.matcher(name).matches() + && !ANYHOST_VALUE.equals(name) + && !LOCALHOST_VALUE.equals(name)); + return result; + } + + + /** + * normalize the ipv6 Address, convert scope name to scope id. + * e.g. + * convert + * fe80:0:0:0:894:aeec:f37d:23e1%en0 + * to + * fe80:0:0:0:894:aeec:f37d:23e1%5 + *

+ * The %5 after ipv6 address is called scope id. + * see java doc of {@link Inet6Address} for more details. + * + * @param address the input address + * @return the normalized address, with scope id converted to int + */ + private static InetAddress normalizeV6Address(Inet6Address address) { + String addr = address.getHostAddress(); + int i = addr.lastIndexOf('%'); + if (i > 0) { + try { + return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId()); + } catch (UnknownHostException e) { + // ignore + logger.debug("Unknown IPV6 address: ", e); + } + } + return address; + } + + // ---------------------- find ip ---------------------- + + + private static InetAddress getLocalAddress0() { + InetAddress localAddress = null; + try { + localAddress = InetAddress.getLocalHost(); + InetAddress addressItem = toValidAddress(localAddress); + if (addressItem != null) { + return addressItem; + } + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + + try { + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + if (null == interfaces) { + return localAddress; + } + while (interfaces.hasMoreElements()) { + try { + NetworkInterface network = interfaces.nextElement(); + if (network.isLoopback() || network.isVirtual() || !network.isUp()) { + continue; + } + Enumeration addresses = network.getInetAddresses(); + while (addresses.hasMoreElements()) { + try { + InetAddress addressItem = toValidAddress(addresses.nextElement()); + if (addressItem != null) { + try { + if(addressItem.isReachable(100)){ + return addressItem; + } + } catch (IOException e) { + // ignore + } + } + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + } + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + } + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + return localAddress; + } + + + // ---------------------- tool ---------------------- + + /** + * Find first valid IP from local network card + * + * @return first valid local IP + */ + public static InetAddress getLocalAddress() { + if (LOCAL_ADDRESS != null) { + return LOCAL_ADDRESS; + } + InetAddress localAddress = getLocalAddress0(); + LOCAL_ADDRESS = localAddress; + return localAddress; + } + + /** + * get ip address + * + * @return String + */ + public static String getIp(){ + return getLocalAddress().getHostAddress(); + } + + /** + * get ip:port + * + * @param port + * @return String + */ + public static String getIpPort(int port){ + String ip = getIp(); + return getIpPort(ip, port); + } + + public static String getIpPort(String ip, int port){ + if (ip==null) { + return null; + } + return ip.concat(":").concat(String.valueOf(port)); + } + + public static Object[] parseIpPort(String address){ + String[] array = address.split(":"); + + String host = array[0]; + int port = Integer.parseInt(array[1]); + + return new Object[]{host, port}; + } + + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/NetUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/NetUtil.java new file mode 100644 index 00000000..31270df1 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/NetUtil.java @@ -0,0 +1,70 @@ +package com.xxl.job.core.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + * net util + * + * @author xuxueli 2017-11-29 17:00:25 + */ +public class NetUtil { + private static Logger logger = LoggerFactory.getLogger(NetUtil.class); + + /** + * find avaliable port + * + * @param defaultPort + * @return + */ + public static int findAvailablePort(int defaultPort) { + int portTmp = defaultPort; + while (portTmp < 65535) { + if (!isPortUsed(portTmp)) { + return portTmp; + } else { + portTmp++; + } + } + portTmp = defaultPort--; + while (portTmp > 0) { + if (!isPortUsed(portTmp)) { + return portTmp; + } else { + portTmp--; + } + } + throw new RuntimeException("no available port."); + } + + /** + * check port used + * + * @param port + * @return + */ + public static boolean isPortUsed(int port) { + boolean used = false; + ServerSocket serverSocket = null; + try { + serverSocket = new ServerSocket(port); + used = false; + } catch (IOException e) { + logger.info(">>>>>>>>>>> xxl-rpc, port[{}] is in use.", port); + used = true; + } finally { + if (serverSocket != null) { + try { + serverSocket.close(); + } catch (IOException e) { + logger.info(""); + } + } + } + return used; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java index f853c947..b5aa196d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobRemotingUtil.java @@ -1,7 +1,6 @@ package com.xxl.job.core.util; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.registry.client.util.json.BasicJson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +19,7 @@ import java.util.Map; */ public class XxlJobRemotingUtil { private static Logger logger = LoggerFactory.getLogger(XxlJobRemotingUtil.class); - public static String XXL_RPC_ACCESS_TOKEN = "XXL-RPC-ACCESS-TOKEN"; + public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN"; // trust-https start @@ -57,10 +56,12 @@ public class XxlJobRemotingUtil { * * @param url * @param accessToken + * @param timeout * @param requestObj + * @param returnTargClassOfT * @return */ - public static ReturnT postBody(String url, String accessToken, Object requestObj, int timeout) { + public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) { HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { @@ -87,19 +88,21 @@ public class XxlJobRemotingUtil { connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); if(accessToken!=null && accessToken.trim().length()>0){ - connection.setRequestProperty(XXL_RPC_ACCESS_TOKEN, accessToken); + connection.setRequestProperty(XXL_JOB_ACCESS_TOKEN, accessToken); } // do connection connection.connect(); // write requestBody - String requestBody = BasicJson.toJson(requestObj); + if (requestObj != null) { + String requestBody = GsonTool.toJson(requestObj); - DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); - dataOutputStream.write(requestBody.getBytes("UTF-8")); - dataOutputStream.flush(); - dataOutputStream.close(); + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(requestBody.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + } /*byte[] requestBodyBytes = requestBody.getBytes("UTF-8"); connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length)); @@ -125,17 +128,7 @@ public class XxlJobRemotingUtil { // parse returnT try { - Map resultMap = BasicJson.parseMap(resultJson); - - ReturnT returnT = new ReturnT(); - if (resultMap==null) { - returnT.setCode(ReturnT.FAIL_CODE); - returnT.setMsg("AdminBizClient Remoting call fail."); - } else { - returnT.setCode(Integer.valueOf(String.valueOf(resultMap.get("code")))); - returnT.setMsg(String.valueOf(resultMap.get("msg"))); - returnT.setContent(String.valueOf(resultMap.get("content"))); - } + ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT); return returnT; } catch (Exception e) { logger.error("xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").", e); diff --git a/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java b/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java deleted file mode 100644 index b6f9fc97..00000000 --- a/xxl-job-core/src/test/java/com/xxl/job/core/biz/impl/ExecutorBizImplTest.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.xxl.job.core.biz.impl; - -import com.xxl.job.core.biz.ExecutorBiz; -import com.xxl.job.core.biz.model.LogResult; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.biz.model.TriggerParam; -import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; -import com.xxl.job.core.executor.XxlJobExecutor; -import com.xxl.job.core.glue.GlueTypeEnum; -import com.xxl.rpc.remoting.invoker.call.CallType; -import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; -import com.xxl.rpc.remoting.invoker.route.LoadBalance; -import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient; -import com.xxl.rpc.serialize.impl.HessianSerializer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - - -public class ExecutorBizImplTest { - - public XxlJobExecutor xxlJobExecutor = null; - public ExecutorBiz executorBiz = null; - - @Before - public void before() throws Exception { - - // init executor - xxlJobExecutor = new XxlJobExecutor(); - xxlJobExecutor.setAdminAddresses(null); - xxlJobExecutor.setAppName("xxl-job-executor-sample"); - xxlJobExecutor.setIp(null); - xxlJobExecutor.setPort(9999); - xxlJobExecutor.setAccessToken(null); - xxlJobExecutor.setLogPath("/data/applogs/xxl-job/jobhandler"); - xxlJobExecutor.setLogRetentionDays(-1); - - // start executor - xxlJobExecutor.start(); - - TimeUnit.SECONDS.sleep(3); - - // init executor biz proxy - XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); - referenceBean.setClient(NettyHttpClient.class); - referenceBean.setSerializer(HessianSerializer.class); - referenceBean.setCallType(CallType.SYNC); - referenceBean.setLoadBalance(LoadBalance.ROUND); - referenceBean.setIface(ExecutorBiz.class); - referenceBean.setVersion(null); - referenceBean.setTimeout(3000); - referenceBean.setAddress("127.0.0.1:9999"); - referenceBean.setAccessToken(null); - referenceBean.setInvokeCallback(null); - referenceBean.setInvokerFactory(null); - - executorBiz = (ExecutorBiz) referenceBean.getObject(); - } - - @After - public void after(){ - if (xxlJobExecutor != null) { - xxlJobExecutor.destroy(); - } - } - - - @Test - public void beat() { - // Act - final ReturnT retval = executorBiz.beat(); - - // Assert result - Assert.assertNotNull(retval); - Assert.assertNull(((ReturnT) retval).getContent()); - Assert.assertEquals(200, retval.getCode()); - Assert.assertNull(retval.getMsg()); - } - - @Test - public void idleBeat(){ - final int jobId = 0; - - // Act - final ReturnT retval = executorBiz.idleBeat(jobId); - - // Assert result - Assert.assertNotNull(retval); - Assert.assertNull(((ReturnT) retval).getContent()); - Assert.assertEquals(500, retval.getCode()); - Assert.assertEquals("job thread is running or has trigger queue.", retval.getMsg()); - } - - @Test - public void kill(){ - final int jobId = 0; - - // Act - final ReturnT retval = executorBiz.kill(jobId); - - // Assert result - Assert.assertNotNull(retval); - Assert.assertNull(((ReturnT) retval).getContent()); - Assert.assertEquals(200, retval.getCode()); - Assert.assertNull(retval.getMsg()); - } - - @Test - public void log(){ - final long logDateTim = 0L; - final long logId = 0; - final int fromLineNum = 0; - - // Act - final ReturnT retval = executorBiz.log(logDateTim, logId, fromLineNum); - - // Assert result - Assert.assertNotNull(retval); - } - - @Test - public void run(){ - // trigger data - final TriggerParam triggerParam = new TriggerParam(); - triggerParam.setJobId(1); - triggerParam.setExecutorHandler("demoJobHandler"); - triggerParam.setExecutorParams(null); - triggerParam.setExecutorBlockStrategy(ExecutorBlockStrategyEnum.COVER_EARLY.name()); - triggerParam.setGlueType(GlueTypeEnum.BEAN.name()); - triggerParam.setGlueSource(null); - triggerParam.setGlueUpdatetime(System.currentTimeMillis()); - triggerParam.setLogId(1); - triggerParam.setLogDateTime(System.currentTimeMillis()); - - // Act - final ReturnT retval = executorBiz.run(triggerParam); - - // Assert result - Assert.assertNotNull(retval); - } - -}