From 86dea7ff7cfb981b6d40e274a8995150cf218ad7 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Thu, 2 Mar 2017 15:13:32 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E8=A6=81=E9=87=8D=E6=9E=84=EF=BC=9A?= =?UTF-8?q?=E5=BA=95=E5=B1=82=E9=80=9A=E8=AE=AF=E6=A8=A1=E5=9D=97=E5=8D=87?= =?UTF-8?q?=E7=BA=A7=E4=BC=98=E5=8C=96=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- xxl-job-admin/pom.xml | 4 +- .../job/admin/controller/IndexController.java | 13 +- .../admin/controller/JobCodeController.java | 2 +- .../admin/controller/JobGroupController.java | 2 +- .../admin/controller/JobInfoController.java | 6 +- .../admin/controller/JobLogController.java | 56 +- .../resolver/WebExceptionResolver.java | 4 +- .../xxl/job/admin/core/biz/AdminBizImpl.java | 84 +++ .../callback/XxlJobLogCallbackServer.java | 62 -- .../XxlJobLogCallbackServerHandler.java | 122 ---- .../admin/core/jobbean/RemoteHttpJobBean.java | 95 +-- .../DynamicSchedulerUtil.java | 596 +++++++++--------- .../admin/core/thread/JobMonitorHelper.java | 10 +- .../admin/core/thread/JobRegistryHelper.java | 2 +- .../xxl/job/admin/core/util/CookieUtil.java | 2 - .../xxl/job/admin}/core/util/JacksonUtil.java | 186 +++--- .../xxl/job/admin/service/IXxlJobService.java | 9 +- .../admin/service/impl/XxlJobServiceImpl.java | 10 +- .../resources/applicationcontext-xxl-job.xml | 2 +- .../com/xxl/job/dao/impl/XxlJobLogTest.java | 61 -- xxl-job-core/pom.xml | 9 +- .../java/com/xxl/job/core/biz/AdminBiz.java | 13 + .../com/xxl/job/core/biz/ExecutorBiz.java | 40 ++ .../job/core/biz/impl/ExecutorBizImpl.java | 112 ++++ .../com/xxl/job/core/biz}/model/ReturnT.java | 105 +-- .../model/TriggerParam.java} | 17 +- .../xxl/job/core/executor/XxlJobExecutor.java | 105 +++ .../core/executor/jetty/XxlJobExecutor.java | 158 ----- .../executor/jetty/XxlJobExecutorHandler.java | 56 -- .../core/executor/servlet/XxlJobServlet.java | 61 -- .../com/xxl/job/core/handler/IJobHandler.java | 4 +- .../xxl/job/core/router/HandlerRouter.java | 100 --- .../java/com/xxl/job/core/router/IAction.java | 13 - .../job/core/router/action/BeatAction.java | 17 - .../job/core/router/action/KillAction.java | 35 - .../xxl/job/core/router/action/LogAction.java | 24 - .../xxl/job/core/router/action/RunAction.java | 76 --- .../job/core/router/model/ResponseModel.java | 45 -- .../xxl/job/core/rpc/codec/RpcRequest.java | 69 ++ .../xxl/job/core/rpc/codec/RpcResponse.java | 41 ++ .../core/rpc/netcom/NetComClientProxy.java | 73 +++ .../core/rpc/netcom/NetComServerFactory.java | 77 +++ .../rpc/netcom/jetty/client/JettyClient.java | 20 + .../rpc/netcom/jetty/server/JettyServer.java | 107 ++++ .../jetty/server/JettyServerHandler.java | 47 ++ .../core/rpc/serialize/HessianSerializer.java | 37 ++ .../core/{router => }/thread/JobThread.java | 62 +- .../thread/TriggerCallbackThread.java | 25 +- .../xxl/job/core/util/ByteHexConverter.java | 90 --- .../xxl/job/core/util/ByteReadFactory.java | 98 --- .../xxl/job/core/util/ByteWriteFactory.java | 67 -- .../com/xxl/job/core/util/HttpClientUtil.java | 92 +++ .../xxl/job/core/util/XxlJobNetCommUtil.java | 170 ----- xxl-job-executor-example/pom.xml | 4 +- .../resources/applicationcontext-xxl-job.xml | 2 +- 56 files changed, 1538 insertions(+), 1863 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java delete mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java delete mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java rename xxl-job-admin/src/main/java/com/xxl/job/admin/core/{util => schedule}/DynamicSchedulerUtil.java (93%) rename {xxl-job-core/src/main/java/com/xxl/job => xxl-job-admin/src/main/java/com/xxl/job/admin}/core/util/JacksonUtil.java (94%) delete mode 100644 xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java rename {xxl-job-admin/src/main/java/com/xxl/job/admin/core => xxl-job-core/src/main/java/com/xxl/job/core/biz}/model/ReturnT.java (72%) rename xxl-job-core/src/main/java/com/xxl/job/core/{router/model/RequestModel.java => biz/model/TriggerParam.java} (90%) create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServerHandler.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/serialize/HessianSerializer.java rename xxl-job-core/src/main/java/com/xxl/job/core/{router => }/thread/JobThread.java (60%) rename xxl-job-core/src/main/java/com/xxl/job/core/{router => }/thread/TriggerCallbackThread.java (58%) delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/ByteReadFactory.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/ByteWriteFactory.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java diff --git a/pom.xml b/pom.xml index 6f78b9c0..8f944413 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.xuxueli xxl-job - 1.6.0 + 1.6.0-SNAPSHOT pom xxl-job diff --git a/xxl-job-admin/pom.xml b/xxl-job-admin/pom.xml index d5c2d251..4561b88d 100644 --- a/xxl-job-admin/pom.xml +++ b/xxl-job-admin/pom.xml @@ -4,13 +4,13 @@ com.xuxueli xxl-job - 1.6.0 + 1.6.0-SNAPSHOT xxl-job-admin war - 1.6.0 + 1.6.0-SNAPSHOT 3.2.17.RELEASE diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java index 464ff10d..a4923e8f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java @@ -1,8 +1,9 @@ package com.xxl.job.admin.controller; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - +import com.xxl.job.admin.controller.annotation.PermessionLimit; +import com.xxl.job.admin.controller.interceptor.PermissionInterceptor; +import com.xxl.job.admin.core.util.PropertiesUtil; +import com.xxl.job.core.biz.model.ReturnT; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; @@ -10,10 +11,8 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; -import com.xxl.job.admin.controller.annotation.PermessionLimit; -import com.xxl.job.admin.controller.interceptor.PermissionInterceptor; -import com.xxl.job.admin.core.model.ReturnT; -import com.xxl.job.admin.core.util.PropertiesUtil; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; /** * index controller diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java index 4f91fb6e..7a1acaff 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java @@ -1,10 +1,10 @@ package com.xxl.job.admin.controller; -import com.xxl.job.admin.core.model.ReturnT; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLogGlue; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogGlueDao; +import com.xxl.job.core.biz.model.ReturnT; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java index dd0e6c3f..a5c32fed 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java @@ -1,10 +1,10 @@ package com.xxl.job.admin.controller; -import com.xxl.job.admin.core.model.ReturnT; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobInfoDao; +import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.registry.RegistHelper; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java index aa14586e..586636d2 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java @@ -1,9 +1,9 @@ package com.xxl.job.admin.controller; -import com.xxl.job.admin.core.model.ReturnT; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.service.IXxlJobService; +import com.xxl.job.core.biz.model.ReturnT; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.RequestMapping; @@ -49,8 +49,8 @@ public class JobInfoController { @RequestMapping("/add") @ResponseBody public ReturnT add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail, - String executorAppname, String executorAddress, String executorHandler, String executorParam, - int glueSwitch, String glueSource, String glueRemark, String childJobKey) { + String executorAppname, String executorAddress, String executorHandler, String executorParam, + int glueSwitch, String glueSource, String glueRemark, String childJobKey) { return xxlJobService.add(jobGroup, jobCron, jobDesc, author, alarmEmail, executorAddress, executorHandler, executorParam, diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java index 127c36ed..7785655e 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java @@ -1,16 +1,14 @@ package com.xxl.job.admin.controller; -import com.xxl.job.admin.core.model.ReturnT; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogDao; -import com.xxl.job.core.router.HandlerRouter.ActionRepository; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.util.XxlJobNetCommUtil; +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.rpc.netcom.NetComClientProxy; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.springframework.stereotype.Controller; @@ -99,22 +97,24 @@ public class JobLogController { if (log == null) { return new ReturnT(500, "查看执行日志失败: 参数异常"); } - if (!(ResponseModel.SUCCESS.equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) { + if (!((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) { return new ReturnT(500, "查看执行日志失败: 任务发起调度失败,无法查看执行日志"); } // trigger id, trigger time - RequestModel requestModel = new RequestModel(); - requestModel.setTimestamp(System.currentTimeMillis()); - requestModel.setAction(ActionRepository.LOG.name()); - requestModel.setLogId(id); - requestModel.setLogDateTim(log.getTriggerTime().getTime()); + ExecutorBiz executorBiz = null; + try { + executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject(); + } catch (Exception e) { + e.printStackTrace(); + return new ReturnT(500, e.getMessage()); + } + ReturnT logResult = executorBiz.log(log.getTriggerTime().getTime(), id); - ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel); - if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { - return new ReturnT(responseModel.getMsg()); + if (ReturnT.SUCCESS_CODE == logResult.getCode()) { + return new ReturnT(logResult.getMsg()); } else { - return new ReturnT(500, "查看执行日志失败: " + responseModel.getMsg()); + return new ReturnT(500, "查看执行日志失败: " + logResult.getMsg()); } } @@ -134,26 +134,28 @@ public class JobLogController { if (log == null || jobInfo==null) { return new ReturnT(500, "参数异常"); } - if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) { + if (!(ReturnT.SUCCESS_CODE +"").equals(log.getTriggerStatus())) { return new ReturnT(500, "调度失败,无法终止日志"); } - + // request of kill - RequestModel requestModel = new RequestModel(); - requestModel.setTimestamp(System.currentTimeMillis()); - requestModel.setAction(ActionRepository.KILL.name()); - requestModel.setJobGroup(String.valueOf(log.getJobGroup())); - requestModel.setJobName(log.getJobName()); + ExecutorBiz executorBiz = null; + try { + executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject(); + } catch (Exception e) { + e.printStackTrace(); + return new ReturnT(500, e.getMessage()); + } + ReturnT runResult = executorBiz.kill(String.valueOf(log.getJobGroup()), log.getJobName()); - ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel); - if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { - log.setHandleStatus(ResponseModel.FAIL); + if (ReturnT.SUCCESS_CODE == runResult.getCode()) { + log.setHandleStatus(ReturnT.SUCCESS_CODE+""); log.setHandleMsg("人为操作主动终止"); log.setHandleTime(new Date()); xxlJobLogDao.updateHandleInfo(log); - return new ReturnT(responseModel.getMsg()); + return new ReturnT(runResult.getMsg()); } else { - return new ReturnT(500, responseModel.getMsg()); + return new ReturnT(500, runResult.getMsg()); } } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java index b0e40036..5a15245f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller.resolver; -import com.xxl.job.admin.core.model.ReturnT; -import com.xxl.job.core.util.JacksonUtil; +import com.xxl.job.admin.core.util.JacksonUtil; +import com.xxl.job.core.biz.model.ReturnT; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.ResponseBody; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java new file mode 100644 index 00000000..f146537a --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java @@ -0,0 +1,84 @@ +package com.xxl.job.admin.core.biz; + +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil; +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import org.apache.commons.lang.StringUtils; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.Date; + +/** + * Created by xuxueli on 17/3/1. + */ +public class AdminBizImpl implements AdminBiz { + private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class); + + @Override + public ReturnT callback(TriggerParam triggerParam) { + + // valid log item + XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(triggerParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + + // trigger success, to trigger child job, and avoid repeat trigger child job + String childTriggerMsg = null; + if ((ReturnT.SUCCESS_CODE+"").equals(triggerParam.getStatus()) && !(ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) { + XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); + if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { + childTriggerMsg = "
"; + String[] childJobKeys = xxlJobInfo.getChildJobKey().split(","); + for (int i = 0; i < childJobKeys.length; i++) { + String[] jobKeyArr = childJobKeys[i].split("_"); + if (jobKeyArr!=null && jobKeyArr.length==2) { + XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(Integer.valueOf(jobKeyArr[0]), jobKeyArr[1]); + if (childJobInfo!=null) { + try { + boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), String.valueOf(childJobInfo.getJobGroup())); + + // add msg + childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}", + (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc()); + } catch (SchedulerException e) { + logger.error("", e); + } + } else { + childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}", + (i+1), childJobKeys.length, childJobKeys[i]); + } + } else { + childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}", + (i+1), childJobKeys.length, childJobKeys[i]); + } + } + + } + } + + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (triggerParam.getMsg() != null) { + handleMsg.append("执行备注:").append(triggerParam.getMsg()); + } + if (childTriggerMsg !=null) { + handleMsg.append("
子任务触发备注:").append(childTriggerMsg); + } + + // success, save log + log.setHandleTime(new Date()); + log.setHandleStatus(triggerParam.getStatus()); + log.setHandleMsg(handleMsg.toString()); + DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log); + + return new ReturnT(ReturnT.SUCCESS_CODE, null); + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java deleted file mode 100644 index 21c8a227..00000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.xxl.job.admin.core.callback; - -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.HandlerCollection; -import org.eclipse.jetty.server.nio.SelectChannelConnector; -import org.eclipse.jetty.util.thread.ExecutorThreadPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by xuxueli on 2016-5-22 11:15:42 - */ -public class XxlJobLogCallbackServer { - private static final Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServer.class); - - private Server server = null; - public void start(int callBackPort) throws Exception { - - final int port = Integer.valueOf(callBackPort); - new Thread(new Runnable() { - @Override - public void run() { - server = new Server(); - server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞 - - // connector - SelectChannelConnector connector = new SelectChannelConnector(); - connector.setPort(port); - connector.setMaxIdleTime(30000); - server.setConnectors(new Connector[] { connector }); - - // handler - HandlerCollection handlerc =new HandlerCollection(); - handlerc.setHandlers(new Handler[]{new XxlJobLogCallbackServerHandler()}); - server.setHandler(handlerc); - - try { - server.start(); - logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer start success at port:{}.", port); - server.join(); // block until server ready - logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer join success at port:{}.", port); - } catch (Exception e) { - e.printStackTrace(); - } - } - }).start(); - - } - - public void destroy() { - if (server!=null) { - try { - server.stop(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java deleted file mode 100644 index 0f8d9f2d..00000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.xxl.job.admin.core.callback; - -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.util.DynamicSchedulerUtil; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.util.XxlJobNetCommUtil; -import org.apache.commons.lang.StringUtils; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.quartz.SchedulerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.text.MessageFormat; -import java.util.Date; - -/** - * Created by xuxueli on 2016-5-22 11:15:42 - */ -public class XxlJobLogCallbackServerHandler extends AbstractHandler { - private static Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServerHandler.class); - - @Override - public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException { - - httpServletRequest.setCharacterEncoding("UTF-8"); - httpServletResponse.setCharacterEncoding("UTF-8"); - - // parse hex-json to request model - String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX); - - // do biz - ResponseModel responseModel = dobiz(requestHex); - - // format response model to hex-json - String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel); - - // response - httpServletResponse.setContentType("text/html;charset=utf-8"); - httpServletResponse.setStatus(HttpServletResponse.SC_OK); - baseRequest.setHandled(true); - httpServletResponse.getWriter().println(responseHex); - } - - private ResponseModel dobiz(String requestHex){ - - // valid hex - if (requestHex==null || requestHex.trim().length()==0) { - return new ResponseModel(ResponseModel.FAIL, "request hex is null."); - } - - // valid request model - RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class); - if (requestModel==null) { - return new ResponseModel(ResponseModel.FAIL, "request hex parse fail."); - } - - // valid log item - XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId()); - if (log == null) { - return new ResponseModel(ResponseModel.FAIL, "log item not found."); - } - - // trigger success, to trigger child job, and avoid repeat trigger child job - String childTriggerMsg = null; - if (ResponseModel.SUCCESS.equals(requestModel.getStatus()) && !ResponseModel.SUCCESS.equals(log.getHandleStatus())) { - XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); - if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { - childTriggerMsg = "
"; - String[] childJobKeys = xxlJobInfo.getChildJobKey().split(","); - for (int i = 0; i < childJobKeys.length; i++) { - String[] jobKeyArr = childJobKeys[i].split("_"); - if (jobKeyArr!=null && jobKeyArr.length==2) { - XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(Integer.valueOf(jobKeyArr[0]), jobKeyArr[1]); - if (childJobInfo!=null) { - try { - boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), String.valueOf(childJobInfo.getJobGroup())); - - // add msg - childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}", - (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc()); - } catch (SchedulerException e) { - logger.error("", e); - } - } else { - childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}", - (i+1), childJobKeys.length, childJobKeys[i]); - } - } else { - childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}", - (i+1), childJobKeys.length, childJobKeys[i]); - } - } - - } - } - - // handle msg - StringBuffer handleMsg = new StringBuffer(); - if (requestModel.getMsg() != null) { - handleMsg.append("执行备注:").append(requestModel.getMsg()); - } - if (childTriggerMsg !=null) { - handleMsg.append("
子任务触发备注:").append(childTriggerMsg); - } - - // success, save log - log.setHandleTime(new Date()); - log.setHandleStatus(requestModel.getStatus()); - log.setHandleMsg(handleMsg.toString()); - DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log); - - return new ResponseModel(ResponseModel.SUCCESS, null); - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index 5e785d5e..4c29301d 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -5,12 +5,12 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.thread.JobMonitorHelper; import com.xxl.job.admin.core.thread.JobRegistryHelper; -import com.xxl.job.admin.core.util.DynamicSchedulerUtil; +import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil; +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.registry.RegistHelper; -import com.xxl.job.core.router.HandlerRouter.ActionRepository; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.util.XxlJobNetCommUtil; +import com.xxl.job.core.rpc.netcom.NetComClientProxy; import org.apache.commons.lang.StringUtils; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -56,17 +56,15 @@ public class RemoteHttpJobBean extends QuartzJobBean { jobLog.setTriggerTime(new Date()); // trigger request - RequestModel requestModel = new RequestModel(); - requestModel.setTimestamp(System.currentTimeMillis()); - requestModel.setAction(ActionRepository.RUN.name()); - requestModel.setJobGroup(String.valueOf(jobInfo.getJobGroup())); - requestModel.setJobName(jobInfo.getJobName()); - requestModel.setExecutorHandler(jobInfo.getExecutorHandler()); - requestModel.setExecutorParams(jobInfo.getExecutorParam()); - requestModel.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true); - requestModel.setLogAddress(adminAddressSet); - requestModel.setLogId(jobLog.getId()); - requestModel.setLogDateTim(jobLog.getTriggerTime().getTime()); + TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobGroup(String.valueOf(jobInfo.getJobGroup())); + triggerParam.setJobName(jobInfo.getJobName()); + triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); + triggerParam.setExecutorParams(jobInfo.getExecutorParam()); + triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true); + triggerParam.setLogAddress(adminAddressSet); + triggerParam.setLogId(jobLog.getId()); + triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); // parse address List addressList = new ArrayList(); @@ -76,13 +74,13 @@ public class RemoteHttpJobBean extends QuartzJobBean { } // failover trigger - ResponseModel responseModel = failoverTrigger(addressList, requestModel, jobLog); + ReturnT responseModel = failoverTrigger(addressList, triggerParam, jobLog); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString()); // update trigger info 2/2 - jobLog.setTriggerStatus(responseModel.getStatus()); + jobLog.setTriggerStatus(responseModel.getCode()+""); jobLog.setTriggerMsg(responseModel.getMsg()); DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog); @@ -97,21 +95,28 @@ public class RemoteHttpJobBean extends QuartzJobBean { * failover for trigger remote address * @return */ - public ResponseModel failoverTrigger(List addressList, RequestModel requestModel, XxlJobLog jobLog){ + public ReturnT failoverTrigger(List addressList, TriggerParam triggerParam, XxlJobLog jobLog){ if (addressList==null || addressList.size() < 1) { - ResponseModel result = new ResponseModel(); - result.setStatus(ResponseModel.FAIL); - result.setMsg( "Trigger error,
>>>[address] is null

" ); - return result; + return new ReturnT(ReturnT.FAIL_CODE, "Trigger error,
>>>[address] is null

"); } else if (addressList.size() == 1) { String address = addressList.get(0); // store real address jobLog.setExecutorAddress(address); - ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); - String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, triggerCallback.getStatus(), triggerCallback.getMsg()); - triggerCallback.setMsg(failoverMessage); - return triggerCallback; + // real trigger + ExecutorBiz executorBiz = null; + try { + executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); + } catch (Exception e) { + e.printStackTrace(); + return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); + } + ReturnT runResult = executorBiz.run(triggerParam); + + String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[code] : {1},
>>>[msg] : {2}

", + address, runResult.getCode(), runResult.getMsg()); + runResult.setMsg(runResult.getMsg() + failoverMessage); + return runResult; } else { // for ha @@ -122,32 +127,38 @@ public class RemoteHttpJobBean extends QuartzJobBean { for (String address : addressList) { if (StringUtils.isNotBlank(address)) { - // beat check - RequestModel beatRequest = new RequestModel(); - beatRequest.setTimestamp(System.currentTimeMillis()); - beatRequest.setAction(ActionRepository.BEAT.name()); - ResponseModel beatResult = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), beatRequest); - failoverMessage += MessageFormat.format("BEAT running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, beatResult.getStatus(), beatResult.getMsg()); + + ExecutorBiz executorBiz = null; + try { + executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); + } catch (Exception e) { + e.printStackTrace(); + return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); + } + + // beat check + ReturnT beatResult = executorBiz.beat(); + failoverMessage += MessageFormat.format("BEAT running,
>>>[address] : {0},
>>>[code] : {1},
>>>[msg] : {2}

", + address, beatResult.getCode(), beatResult.getMsg()); // beat success, trigger do - if (beatResult.SUCCESS.equals(beatResult.getStatus())) { + if (beatResult.getCode() == ReturnT.SUCCESS_CODE) { // store real address jobLog.setExecutorAddress(address); // real trigger - ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel); - failoverMessage += MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", address, triggerCallback.getStatus(), triggerCallback.getMsg()); - triggerCallback.setMsg(failoverMessage); - return triggerCallback; + ReturnT runResult = executorBiz.run(triggerParam); + + failoverMessage += MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}

", + address, runResult.getCode(), runResult.getMsg()); + runResult.setMsg( runResult.getMsg() + failoverMessage); + return runResult; } } } - ResponseModel result = new ResponseModel(); - result.setStatus(ResponseModel.FAIL); - result.setMsg(failoverMessage); - return result; + return new ReturnT(ReturnT.FAIL_CODE, failoverMessage); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/DynamicSchedulerUtil.java similarity index 93% rename from xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java rename to xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/DynamicSchedulerUtil.java index 3e865b8c..f1a3a1d4 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/DynamicSchedulerUtil.java @@ -1,301 +1,297 @@ -package com.xxl.job.admin.core.util; - -import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer; -import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.thread.JobRegistryHelper; -import com.xxl.job.admin.dao.IXxlJobGroupDao; -import com.xxl.job.admin.dao.IXxlJobInfoDao; -import com.xxl.job.admin.dao.IXxlJobLogDao; -import com.xxl.job.admin.dao.IXxlJobRegistryDao; -import com.xxl.job.core.util.IpUtil; -import org.quartz.*; -import org.quartz.Trigger.TriggerState; -import org.quartz.impl.matchers.GroupMatcher; -import org.quartz.impl.triggers.CronTriggerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.util.Assert; - -import java.util.*; - -/** - * base quartz scheduler util - * @author xuxueli 2015-12-19 16:13:53 - */ -public final class DynamicSchedulerUtil implements ApplicationContextAware, InitializingBean { - private static final Logger logger = LoggerFactory.getLogger(DynamicSchedulerUtil.class); - - // Scheduler - private static Scheduler scheduler; - public static void setScheduler(Scheduler scheduler) { - DynamicSchedulerUtil.scheduler = scheduler; - } - - // trigger callback address - private String callBackIp; - private int callBackPort = 8888; - private static String callbackAddress; - - public void setCallBackIp(String callBackIp) { - this.callBackIp = callBackIp; - } - public void setCallBackPort(int callBackPort) { - this.callBackPort = callBackPort; - } - public static String getCallbackAddress(){ - return callbackAddress; - } - - // init - XxlJobLogCallbackServer xxlJobLogCallbackServer = null; - public void init(){ - try { - // start callback server - xxlJobLogCallbackServer = new XxlJobLogCallbackServer(); - xxlJobLogCallbackServer.start(callBackPort); - } catch (Exception e) { - e.printStackTrace(); - } - - // init callbackAddress - if (callBackIp!=null && callBackIp.trim().length()>0) { - callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort)); - } else { - callbackAddress = IpUtil.getIpPort(callBackPort);; - } - - // init JobRegistryHelper - JobRegistryHelper.discover("g", "k"); - } - - // destroy - public void destroy(){ - if (xxlJobLogCallbackServer!=null) { - xxlJobLogCallbackServer.destroy(); - } - } - - // xxlJobLogDao、xxlJobInfoDao - public static IXxlJobLogDao xxlJobLogDao; - public static IXxlJobInfoDao xxlJobInfoDao; - public static IXxlJobRegistryDao xxlJobRegistryDao; - public static IXxlJobGroupDao xxlJobGroupDao; - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class); - DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class); - DynamicSchedulerUtil.xxlJobRegistryDao = applicationContext.getBean(IXxlJobRegistryDao.class); - DynamicSchedulerUtil.xxlJobGroupDao = applicationContext.getBean(IXxlJobGroupDao.class); - } - - @Override - public void afterPropertiesSet() throws Exception { - Assert.notNull(scheduler, "quartz scheduler is null"); - logger.info(">>>>>>>>> init quartz scheduler success.[{}]", scheduler); - - } - - // getJobKeys - @Deprecated - public static List> getJobList(){ - List> jobList = new ArrayList>(); - - try { - if (scheduler.getJobGroupNames()==null || scheduler.getJobGroupNames().size()==0) { - return null; - } - String groupName = scheduler.getJobGroupNames().get(0); - Set jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)); - if (jobKeys!=null && jobKeys.size()>0) { - for (JobKey jobKey : jobKeys) { - TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), Scheduler.DEFAULT_GROUP); - Trigger trigger = scheduler.getTrigger(triggerKey); - JobDetail jobDetail = scheduler.getJobDetail(jobKey); - TriggerState triggerState = scheduler.getTriggerState(triggerKey); - Map jobMap = new HashMap(); - jobMap.put("TriggerKey", triggerKey); - jobMap.put("Trigger", trigger); - jobMap.put("JobDetail", jobDetail); - jobMap.put("TriggerState", triggerState); - jobList.add(jobMap); - } - } - - } catch (SchedulerException e) { - e.printStackTrace(); - return null; - } - return jobList; - } - - // fill job info - public static void fillJobInfo(XxlJobInfo jobInfo) { - // TriggerKey : name + group - TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup())); - - try { - Trigger trigger = scheduler.getTrigger(triggerKey); - - TriggerState triggerState = scheduler.getTriggerState(triggerKey); - - // parse params - if (trigger!=null && trigger instanceof CronTriggerImpl) { - String cronExpression = ((CronTriggerImpl) trigger).getCronExpression(); - jobInfo.setJobCron(cronExpression); - } - - //JobKey jobKey = new JobKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup())); - //JobDetail jobDetail = scheduler.getJobDetail(jobKey); - //String jobClass = jobDetail.getJobClass().getName(); - - if (triggerState!=null) { - jobInfo.setJobStatus(triggerState.name()); - } - - } catch (SchedulerException e) { - e.printStackTrace(); - } - } - - // check if exists - public static boolean checkExists(String jobName, String jobGroup) throws SchedulerException{ - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); - return scheduler.checkExists(triggerKey); - } - - // addJob 新增 - @SuppressWarnings("unchecked") - public static boolean addJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException { - // TriggerKey : name + group - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); - JobKey jobKey = new JobKey(jobName, jobGroup); - - // TriggerKey valid if_exists - if (checkExists(jobName, jobGroup)) { - logger.info(">>>>>>>>> addJob fail, job already exist, jobGroup:{}, jobName:{}", jobGroup, jobName); - return false; - } - - // CronTrigger : TriggerKey + cronExpression // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度 - CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); - CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); - - // JobDetail : jobClass - Class jobClass_ = RemoteHttpJobBean.class; // Class.forName(jobInfo.getJobClass()); - - JobDetail jobDetail = JobBuilder.newJob(jobClass_).withIdentity(jobKey).build(); - /*if (jobInfo.getJobData()!=null) { - JobDataMap jobDataMap = jobDetail.getJobDataMap(); - jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class)); - // JobExecutionContext context.getMergedJobDataMap().get("mailGuid"); - }*/ - - // schedule : jobDetail + cronTrigger - Date date = scheduler.scheduleJob(jobDetail, cronTrigger); - - logger.info(">>>>>>>>>>> addJob success, jobDetail:{}, cronTrigger:{}, date:{}", jobDetail, cronTrigger, date); - return true; - } - - // reschedule - public static boolean rescheduleJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException { - - // TriggerKey valid if_exists - if (!checkExists(jobName, jobGroup)) { - logger.info(">>>>>>>>>>> rescheduleJob fail, job not exists, JobGroup:{}, JobName:{}", jobGroup, jobName); - return false; - } - - // TriggerKey : name + group - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); - JobKey jobKey = new JobKey(jobName, jobGroup); - - // CronTrigger : TriggerKey + cronExpression - CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); - CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); - - //scheduler.rescheduleJob(triggerKey, cronTrigger); - - // JobDetail-JobDataMap fresh - JobDetail jobDetail = scheduler.getJobDetail(jobKey); - /*JobDataMap jobDataMap = jobDetail.getJobDataMap(); - jobDataMap.clear(); - jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class));*/ - - // Trigger fresh - HashSet triggerSet = new HashSet(); - triggerSet.add(cronTrigger); - - scheduler.scheduleJob(jobDetail, triggerSet, true); - logger.info(">>>>>>>>>>> resumeJob success, JobGroup:{}, JobName:{}", jobGroup, jobName); - return true; - } - - // unscheduleJob - public static boolean removeJob(String jobName, String jobGroup) throws SchedulerException { - // TriggerKey : name + group - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); - boolean result = false; - if (checkExists(jobName, jobGroup)) { - result = scheduler.unscheduleJob(triggerKey); - logger.info(">>>>>>>>>>> removeJob, triggerKey:{}, result [{}]", triggerKey, result); - } - return true; - } - - // Pause - public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException { - // TriggerKey : name + group - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); - - boolean result = false; - if (checkExists(jobName, jobGroup)) { - scheduler.pauseTrigger(triggerKey); - result = true; - logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey); - } else { - logger.info(">>>>>>>>>>> pauseJob fail, triggerKey:{}", triggerKey); - } - return result; - } - - // resume - public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException { - // TriggerKey : name + group - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); - - boolean result = false; - if (checkExists(jobName, jobGroup)) { - scheduler.resumeTrigger(triggerKey); - result = true; - logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey); - } else { - logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}", triggerKey); - } - return result; - } - - // run - public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException { - // TriggerKey : name + group - JobKey jobKey = new JobKey(jobName, jobGroup); - - boolean result = false; - if (checkExists(jobName, jobGroup)) { - scheduler.triggerJob(jobKey); - result = true; - logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey); - } else { - logger.info(">>>>>>>>>>> runJob fail, jobKey:{}", jobKey); - } - return result; - } - - +package com.xxl.job.admin.core.schedule; + +import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.thread.JobRegistryHelper; +import com.xxl.job.admin.dao.IXxlJobGroupDao; +import com.xxl.job.admin.dao.IXxlJobInfoDao; +import com.xxl.job.admin.dao.IXxlJobLogDao; +import com.xxl.job.admin.dao.IXxlJobRegistryDao; +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.admin.core.biz.AdminBizImpl; +import com.xxl.job.core.rpc.netcom.NetComServerFactory; +import com.xxl.job.core.util.IpUtil; +import org.quartz.*; +import org.quartz.Trigger.TriggerState; +import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.impl.triggers.CronTriggerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.util.Assert; + +import java.util.*; + +/** + * base quartz scheduler util + * @author xuxueli 2015-12-19 16:13:53 + */ +public final class DynamicSchedulerUtil implements ApplicationContextAware, InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(DynamicSchedulerUtil.class); + + // Scheduler + private static Scheduler scheduler; + public static void setScheduler(Scheduler scheduler) { + DynamicSchedulerUtil.scheduler = scheduler; + } + + // trigger callback address + private String callBackIp; + private int callBackPort = 8888; + private static String callbackAddress; + + public void setCallBackIp(String callBackIp) { + this.callBackIp = callBackIp; + } + public void setCallBackPort(int callBackPort) { + this.callBackPort = callBackPort; + } + public static String getCallbackAddress(){ + return callbackAddress; + } + + // init + private NetComServerFactory serverFactory = new NetComServerFactory(); + public void init() throws Exception { + // server + NetComServerFactory.putService(AdminBiz.class, new AdminBizImpl()); + serverFactory.start(callBackPort, callBackIp, null, null); + + // init callbackAddress + if (callBackIp!=null && callBackIp.trim().length()>0) { + callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort)); + } else { + callbackAddress = IpUtil.getIpPort(callBackPort);; + } + + // init JobRegistryHelper + JobRegistryHelper.discover("g", "k"); + } + + // destroy + public void destroy(){ + serverFactory.destroy(); + } + + // xxlJobLogDao、xxlJobInfoDao + public static IXxlJobLogDao xxlJobLogDao; + public static IXxlJobInfoDao xxlJobInfoDao; + public static IXxlJobRegistryDao xxlJobRegistryDao; + public static IXxlJobGroupDao xxlJobGroupDao; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class); + DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class); + DynamicSchedulerUtil.xxlJobRegistryDao = applicationContext.getBean(IXxlJobRegistryDao.class); + DynamicSchedulerUtil.xxlJobGroupDao = applicationContext.getBean(IXxlJobGroupDao.class); + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(scheduler, "quartz scheduler is null"); + logger.info(">>>>>>>>> init quartz scheduler success.[{}]", scheduler); + + } + + // getJobKeys + @Deprecated + public static List> getJobList(){ + List> jobList = new ArrayList>(); + + try { + if (scheduler.getJobGroupNames()==null || scheduler.getJobGroupNames().size()==0) { + return null; + } + String groupName = scheduler.getJobGroupNames().get(0); + Set jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)); + if (jobKeys!=null && jobKeys.size()>0) { + for (JobKey jobKey : jobKeys) { + TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), Scheduler.DEFAULT_GROUP); + Trigger trigger = scheduler.getTrigger(triggerKey); + JobDetail jobDetail = scheduler.getJobDetail(jobKey); + TriggerState triggerState = scheduler.getTriggerState(triggerKey); + Map jobMap = new HashMap(); + jobMap.put("TriggerKey", triggerKey); + jobMap.put("Trigger", trigger); + jobMap.put("JobDetail", jobDetail); + jobMap.put("TriggerState", triggerState); + jobList.add(jobMap); + } + } + + } catch (SchedulerException e) { + e.printStackTrace(); + return null; + } + return jobList; + } + + // fill job info + public static void fillJobInfo(XxlJobInfo jobInfo) { + // TriggerKey : name + group + TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup())); + + try { + Trigger trigger = scheduler.getTrigger(triggerKey); + + TriggerState triggerState = scheduler.getTriggerState(triggerKey); + + // parse params + if (trigger!=null && trigger instanceof CronTriggerImpl) { + String cronExpression = ((CronTriggerImpl) trigger).getCronExpression(); + jobInfo.setJobCron(cronExpression); + } + + //JobKey jobKey = new JobKey(jobInfo.getJobName(), String.valueOf(jobInfo.getJobGroup())); + //JobDetail jobDetail = scheduler.getJobDetail(jobKey); + //String jobClass = jobDetail.getJobClass().getName(); + + if (triggerState!=null) { + jobInfo.setJobStatus(triggerState.name()); + } + + } catch (SchedulerException e) { + e.printStackTrace(); + } + } + + // check if exists + public static boolean checkExists(String jobName, String jobGroup) throws SchedulerException{ + TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); + return scheduler.checkExists(triggerKey); + } + + // addJob 新增 + @SuppressWarnings("unchecked") + public static boolean addJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException { + // TriggerKey : name + group + TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); + JobKey jobKey = new JobKey(jobName, jobGroup); + + // TriggerKey valid if_exists + if (checkExists(jobName, jobGroup)) { + logger.info(">>>>>>>>> addJob fail, job already exist, jobGroup:{}, jobName:{}", jobGroup, jobName); + return false; + } + + // CronTrigger : TriggerKey + cronExpression // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度 + CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); + CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); + + // JobDetail : jobClass + Class jobClass_ = RemoteHttpJobBean.class; // Class.forName(jobInfo.getJobClass()); + + JobDetail jobDetail = JobBuilder.newJob(jobClass_).withIdentity(jobKey).build(); + /*if (jobInfo.getJobData()!=null) { + JobDataMap jobDataMap = jobDetail.getJobDataMap(); + jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class)); + // JobExecutionContext context.getMergedJobDataMap().get("mailGuid"); + }*/ + + // schedule : jobDetail + cronTrigger + Date date = scheduler.scheduleJob(jobDetail, cronTrigger); + + logger.info(">>>>>>>>>>> addJob success, jobDetail:{}, cronTrigger:{}, date:{}", jobDetail, cronTrigger, date); + return true; + } + + // reschedule + public static boolean rescheduleJob(String jobGroup, String jobName, String cronExpression) throws SchedulerException { + + // TriggerKey valid if_exists + if (!checkExists(jobName, jobGroup)) { + logger.info(">>>>>>>>>>> rescheduleJob fail, job not exists, JobGroup:{}, JobName:{}", jobGroup, jobName); + return false; + } + + // TriggerKey : name + group + TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); + JobKey jobKey = new JobKey(jobName, jobGroup); + + // CronTrigger : TriggerKey + cronExpression + CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); + CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); + + //scheduler.rescheduleJob(triggerKey, cronTrigger); + + // JobDetail-JobDataMap fresh + JobDetail jobDetail = scheduler.getJobDetail(jobKey); + /*JobDataMap jobDataMap = jobDetail.getJobDataMap(); + jobDataMap.clear(); + jobDataMap.putAll(JacksonUtil.readValue(jobInfo.getJobData(), Map.class));*/ + + // Trigger fresh + HashSet triggerSet = new HashSet(); + triggerSet.add(cronTrigger); + + scheduler.scheduleJob(jobDetail, triggerSet, true); + logger.info(">>>>>>>>>>> resumeJob success, JobGroup:{}, JobName:{}", jobGroup, jobName); + return true; + } + + // unscheduleJob + public static boolean removeJob(String jobName, String jobGroup) throws SchedulerException { + // TriggerKey : name + group + TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); + boolean result = false; + if (checkExists(jobName, jobGroup)) { + result = scheduler.unscheduleJob(triggerKey); + logger.info(">>>>>>>>>>> removeJob, triggerKey:{}, result [{}]", triggerKey, result); + } + return true; + } + + // Pause + public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException { + // TriggerKey : name + group + TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); + + boolean result = false; + if (checkExists(jobName, jobGroup)) { + scheduler.pauseTrigger(triggerKey); + result = true; + logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey); + } else { + logger.info(">>>>>>>>>>> pauseJob fail, triggerKey:{}", triggerKey); + } + return result; + } + + // resume + public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException { + // TriggerKey : name + group + TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); + + boolean result = false; + if (checkExists(jobName, jobGroup)) { + scheduler.resumeTrigger(triggerKey); + result = true; + logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey); + } else { + logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}", triggerKey); + } + return result; + } + + // run + public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException { + // TriggerKey : name + group + JobKey jobKey = new JobKey(jobName, jobGroup); + + boolean result = false; + if (checkExists(jobName, jobGroup)) { + scheduler.triggerJob(jobKey); + result = true; + logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey); + } else { + logger.info(">>>>>>>>>>> runJob fail, jobKey:{}", jobKey); + } + return result; + } + + } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java index 9d52ab44..0c77e735 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java @@ -3,9 +3,9 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.util.DynamicSchedulerUtil; +import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil; import com.xxl.job.admin.core.util.MailUtil; -import com.xxl.job.core.router.model.ResponseModel; +import com.xxl.job.core.biz.model.ReturnT; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,7 @@ public class JobMonitorHelper { logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId); XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId); if (log!=null) { - if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) { + if ((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { @@ -49,10 +49,10 @@ public class JobMonitorHelper { } JobMonitorHelper.monitor(jobLogId); } - if (ResponseModel.SUCCESS.equals(log.getTriggerStatus()) && ResponseModel.SUCCESS.equals(log.getHandleStatus())) { + if ((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) && (ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) { // pass } - if (ResponseModel.FAIL.equals(log.getTriggerStatus()) || ResponseModel.FAIL.equals(log.getHandleStatus())) { + if ((ReturnT.FAIL+"").equals(log.getTriggerStatus()) || (ReturnT.FAIL+"").equals(log.getHandleStatus())) { XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 61a84d75..e641769b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.model.XxlJobRegistry; -import com.xxl.job.admin.core.util.DynamicSchedulerUtil; +import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil; import com.xxl.job.core.registry.RegistHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/CookieUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/CookieUtil.java index d4724323..19a6751f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/CookieUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/CookieUtil.java @@ -38,12 +38,10 @@ public class CookieUtil { /** * 保存 - * @param request * @param response * @param key * @param value * @param maxAge - * @param domain */ private static void set(HttpServletResponse response, String key, String value, int maxAge, String path) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java similarity index 94% rename from xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java rename to xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java index e80ec6f2..16ca016e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java @@ -1,93 +1,93 @@ -package com.xxl.job.core.util; - - -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * Jackson util - * - * 1、obj need private and set/get; - * 2、do not support inner class; - * - * @author xuxueli 2015-9-25 18:02:56 - */ -public class JacksonUtil { - private final static ObjectMapper objectMapper = new ObjectMapper(); - public static ObjectMapper getInstance() { - return objectMapper; - } - - /** - * bean、array、List、Map --> json - * - * @param obj - * @return json string - * @throws Exception - */ - public static String writeValueAsString(Object obj) { - try { - return getInstance().writeValueAsString(obj); - } catch (JsonGenerationException e) { - e.printStackTrace(); - } catch (JsonMappingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - - /** - * string --> bean、Map、List(array) - * - * @param jsonStr - * @param clazz - * @return obj - * @throws Exception - */ - public static T readValue(String jsonStr, Class clazz) { - try { - return getInstance().readValue(jsonStr, clazz); - } catch (JsonParseException e) { - e.printStackTrace(); - } catch (JsonMappingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - public static T readValueRefer(String jsonStr, Class clazz) { - try { - return getInstance().readValue(jsonStr, new TypeReference() { }); - } catch (JsonParseException e) { - e.printStackTrace(); - } catch (JsonMappingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - - public static void main(String[] args) { - try { - Map map = new HashMap(); - map.put("aaa", "111"); - map.put("bbb", "222"); - String json = writeValueAsString(map); - System.out.println(json); - System.out.println(readValue(json, Map.class)); - } catch (Exception e) { - e.printStackTrace(); - } - } -} +package com.xxl.job.admin.core.util; + + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Jackson util + * + * 1、obj need private and set/get; + * 2、do not support inner class; + * + * @author xuxueli 2015-9-25 18:02:56 + */ +public class JacksonUtil { + private final static ObjectMapper objectMapper = new ObjectMapper(); + public static ObjectMapper getInstance() { + return objectMapper; + } + + /** + * bean、array、List、Map --> json + * + * @param obj + * @return json string + * @throws Exception + */ + public static String writeValueAsString(Object obj) { + try { + return getInstance().writeValueAsString(obj); + } catch (JsonGenerationException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * string --> bean、Map、List(array) + * + * @param jsonStr + * @param clazz + * @return obj + * @throws Exception + */ + public static T readValue(String jsonStr, Class clazz) { + try { + return getInstance().readValue(jsonStr, clazz); + } catch (JsonParseException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + public static T readValueRefer(String jsonStr, Class clazz) { + try { + return getInstance().readValue(jsonStr, new TypeReference() { }); + } catch (JsonParseException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public static void main(String[] args) { + try { + Map map = new HashMap(); + map.put("aaa", "111"); + map.put("bbb", "222"); + String json = writeValueAsString(map); + System.out.println(json); + System.out.println(readValue(json, Map.class)); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java index ac320776..727c906a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/IXxlJobService.java @@ -1,6 +1,7 @@ package com.xxl.job.admin.service; -import com.xxl.job.admin.core.model.ReturnT; + +import com.xxl.job.core.biz.model.ReturnT; import java.util.Map; @@ -13,9 +14,9 @@ public interface IXxlJobService { public Map pageList(int start, int length, int jobGroup, String executorHandler, String filterTime); - public ReturnT add(int jobGroup, String jobCron, String jobDesc,String author, String alarmEmail, - String executorAddress, String executorHandler, String executorParam, - int glueSwitch, String glueSource, String glueRemark, String childJobKey); + public ReturnT add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail, + String executorAddress, String executorHandler, String executorParam, + int glueSwitch, String glueSource, String glueRemark, String childJobKey); public ReturnT reschedule(int jobGroup, String jobName, String jobCron, String jobDesc, String author, String alarmEmail, String executorAddress, String executorHandler, String executorParam, int glueSwitch, String childJobKey); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 197845ab..82f882de 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -1,14 +1,14 @@ package com.xxl.job.admin.service.impl; -import com.xxl.job.admin.core.model.ReturnT; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.util.DynamicSchedulerUtil; +import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil; import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobLogGlueDao; import com.xxl.job.admin.service.IXxlJobService; +import com.xxl.job.core.biz.model.ReturnT; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.FastDateFormat; import org.quartz.CronExpression; @@ -64,9 +64,9 @@ public class XxlJobServiceImpl implements IXxlJobService { } @Override - public ReturnT add(int jobGroup, String jobCron, String jobDesc,String author, String alarmEmail, - String executorAddress, String executorHandler, String executorParam, - int glueSwitch, String glueSource, String glueRemark, String childJobKey) { + public ReturnT add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail, + String executorAddress, String executorHandler, String executorParam, + int glueSwitch, String glueSource, String glueRemark, String childJobKey) { // valid XxlJobGroup group = xxlJobGroupDao.load(jobGroup); if (group == null) { diff --git a/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml b/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml index bc990f91..28b4d7de 100644 --- a/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml +++ b/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml @@ -14,7 +14,7 @@ - + diff --git a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java deleted file mode 100644 index eddd29b6..00000000 --- a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/XxlJobLogTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.xxl.job.dao.impl; - -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.dao.IXxlJobLogDao; -import com.xxl.job.core.router.model.ResponseModel; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import javax.annotation.Resource; -import java.util.Date; -import java.util.List; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(locations = "classpath*:applicationcontext-*.xml") -public class XxlJobLogTest { - - @Resource - private IXxlJobLogDao xxlJobLogDao; - - @Test - public void save_load(){ - XxlJobLog xxlJobLog = new XxlJobLog(); - xxlJobLog.setJobName("job_name"); - int count = xxlJobLogDao.save(xxlJobLog); - System.out.println(count); - System.out.println(xxlJobLog.getId()); - - XxlJobLog item = xxlJobLogDao.load(xxlJobLog.getId()); - System.out.println(item); - } - - @Test - public void updateTriggerInfo(){ - XxlJobLog xxlJobLog = xxlJobLogDao.load(29); - xxlJobLog.setTriggerTime(new Date()); - xxlJobLog.setTriggerStatus(ResponseModel.SUCCESS); - xxlJobLog.setTriggerMsg("trigger msg"); - xxlJobLogDao.updateTriggerInfo(xxlJobLog); - } - - @Test - public void updateHandleInfo(){ - XxlJobLog xxlJobLog = xxlJobLogDao.load(29); - xxlJobLog.setHandleTime(new Date()); - xxlJobLog.setHandleStatus(ResponseModel.SUCCESS); - xxlJobLog.setHandleMsg("handle msg"); - xxlJobLogDao.updateHandleInfo(xxlJobLog); - } - - @Test - public void pageList(){ - List list = xxlJobLogDao.pageList(0, 20, 0, null, null, null); - int list_count = xxlJobLogDao.pageListCount(0, 20, 0, null, null, null); - - System.out.println(list); - System.out.println(list_count); - } - -} diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml index 3bff495b..cb83c6fd 100644 --- a/xxl-job-core/pom.xml +++ b/xxl-job-core/pom.xml @@ -4,7 +4,7 @@ com.xuxueli xxl-job - 1.6.0 + 1.6.0-SNAPSHOT xxl-job-core jar @@ -40,6 +40,13 @@ 1.7.5 + + + com.caucho + hessian + 4.0.38 + + org.codehaus.jackson diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java new file mode 100644 index 00000000..f4ba9ab7 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java @@ -0,0 +1,13 @@ +package com.xxl.job.core.biz; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; + +/** + * Created by xuxueli on 17/3/1. + */ +public interface AdminBiz { + + public ReturnT callback(TriggerParam triggerParam); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java new file mode 100644 index 00000000..91844a2d --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java @@ -0,0 +1,40 @@ +package com.xxl.job.core.biz; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; + +/** + * Created by xuxueli on 17/3/1. + */ +public interface ExecutorBiz { + + /** + * beat + * @return + */ + public ReturnT beat(); + + /** + * kill + * @param jobGroup + * @param jobName + * @return + */ + public ReturnT kill(String jobGroup, String jobName); + + /** + * log + * @param logDateTim + * @param logId + * @return + */ + public ReturnT log(long logDateTim, int logId); + + /** + * run + * @param triggerParam + * @return + */ + public ReturnT run(TriggerParam triggerParam); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java new file mode 100644 index 00000000..e3313386 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -0,0 +1,112 @@ +package com.xxl.job.core.biz.impl; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.glue.GlueFactory; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.impl.GlueJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.thread.JobThread; + +import java.util.Date; + +/** + * Created by xuxueli on 17/3/1. + */ +public class ExecutorBizImpl implements ExecutorBiz { + + @Override + public ReturnT beat() { + return ReturnT.SUCCESS; + } + + @Override + public ReturnT kill(String jobGroup, String jobName) { + + // generate jobKey + String jobKey = jobGroup.concat("_").concat(jobName); + + // kill handlerThread, and create new one + JobThread jobThread = XxlJobExecutor.loadJobThread(jobKey); + + if (jobThread != null) { + IJobHandler handler = jobThread.getHandler(); + jobThread.toStop("人工手动终止"); + jobThread.interrupt(); + //XxlJobExecutor.registJobThread(jobKey, handler); + return ReturnT.SUCCESS; + } + + return new ReturnT(ReturnT.FAIL_CODE, "job thread not found."); + } + + @Override + public ReturnT log(long logDateTim, int logId) { + // log filename: yyyy-MM-dd/9999.log + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId); + + String logConteng = XxlJobFileAppender.readLog(logFileName); + return new ReturnT(ReturnT.SUCCESS_CODE, logConteng); + } + + @Override + public ReturnT run(TriggerParam triggerParam) { + // generate jobKey + String jobKey = triggerParam.getJobGroup().concat("_").concat(triggerParam.getJobName()); + + // load old thread + JobThread jobThread = XxlJobExecutor.loadJobThread(jobKey); + + if (!triggerParam.isGlueSwitch()) { + // bean model + + // valid handler instance + IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); + if (jobHandler==null) { + return new ReturnT(ReturnT.FAIL_CODE, "job handler for jobKey=[" + jobKey + "] not found."); + } + + if (jobThread == null) { + jobThread = XxlJobExecutor.registJobThread(jobKey, jobHandler); + } else { + // job handler update, kill old job thread + if (jobThread.getHandler() != jobHandler) { + // kill old job thread + jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); + jobThread.interrupt(); + + // new thread, with new job handler + jobThread = XxlJobExecutor.registJobThread(jobKey, jobHandler); + } + } + } else { + // glue model + + // valid glueloader + if (!GlueFactory.isActive()) { + return new ReturnT(ReturnT.FAIL_CODE, "glueLoader for jobKey=[" + jobKey + "] not found."); + } + + if (jobThread == null) { + jobThread = XxlJobExecutor.registJobThread(jobKey, new GlueJobHandler(triggerParam.getJobGroup(), triggerParam.getJobName())); + } else { + // job handler update, kill old job thread + if (!(jobThread.getHandler() instanceof GlueJobHandler)) { + // kill old job thread + jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); + jobThread.interrupt(); + + // new thread, with new job handler + jobThread = XxlJobExecutor.registJobThread(jobKey, new GlueJobHandler(triggerParam.getJobGroup(), triggerParam.getJobName())); + } + } + } + + // push data to queue + jobThread.pushTriggerQueue(triggerParam); + return ReturnT.SUCCESS; + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/ReturnT.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java similarity index 72% rename from xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/ReturnT.java rename to xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java index 205fc74b..6eca8cba 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/ReturnT.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java @@ -1,50 +1,55 @@ -package com.xxl.job.admin.core.model; - -/** - * common return - * @author xuxueli 2015-12-4 16:32:31 - * @param - */ -public class ReturnT { - public static final ReturnT SUCCESS = new ReturnT(null); - public static final ReturnT FAIL = new ReturnT(500, null); - - private int code; - private String msg; - private T content; - - public ReturnT(int code, String msg) { - this.code = code; - this.msg = msg; - } - public ReturnT(T content) { - this.code = 200; - this.content = content; - } - - public int getCode() { - return code; - } - public void setCode(int code) { - this.code = code; - } - public String getMsg() { - return msg; - } - public void setMsg(String msg) { - this.msg = msg; - } - public T getContent() { - return content; - } - public void setContent(T content) { - this.content = content; - } - - @Override - public String toString() { - return "ReturnT [code=" + code + ", msg=" + msg + ", content=" - + content + "]"; - } - -} +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * common return + * @author xuxueli 2015-12-4 16:32:31 + * @param + */ +public class ReturnT implements Serializable { + public static final long serialVersionUID = 42L; + + public static final int SUCCESS_CODE = 200; + public static final int FAIL_CODE = 500; + public static final ReturnT SUCCESS = new ReturnT(null); + public static final ReturnT FAIL = new ReturnT(FAIL_CODE, null); + + private int code; + private String msg; + private T content; + + public ReturnT(int code, String msg) { + this.code = code; + this.msg = msg; + } + public ReturnT(T content) { + this.code = SUCCESS_CODE; + this.content = content; + } + + public int getCode() { + return code; + } + public void setCode(int code) { + this.code = code; + } + public String getMsg() { + return msg; + } + public void setMsg(String msg) { + this.msg = msg; + } + public T getContent() { + return content; + } + public void setContent(T content) { + this.content = content; + } + + @Override + public String toString() { + return "ReturnT [code=" + code + ", msg=" + msg + ", content=" + content + "]"; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java similarity index 90% rename from xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java rename to xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java index bcd10eae..3140ede8 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -1,13 +1,14 @@ -package com.xxl.job.core.router.model; +package com.xxl.job.core.biz.model; +import java.io.Serializable; import java.util.Set; /** * Created by xuxueli on 16/7/22. */ -public class RequestModel { +public class TriggerParam implements Serializable{ + private static final long serialVersionUID = 42L; - private long timestamp; private String action; private String jobGroup; @@ -25,15 +26,6 @@ public class RequestModel { private String status; private String msg; - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - public String getAction() { return action; } @@ -125,7 +117,6 @@ public class RequestModel { @Override public String toString() { return "RequestModel{" + - "timestamp=" + timestamp + ", action='" + action + '\'' + ", jobGroup='" + jobGroup + '\'' + ", jobName='" + jobName + '\'' + diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java new file mode 100644 index 00000000..249a351d --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -0,0 +1,105 @@ +package com.xxl.job.core.executor; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.impl.ExecutorBizImpl; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.annotation.JobHander; +import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.rpc.netcom.NetComServerFactory; +import com.xxl.job.core.thread.JobThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Created by xuxueli on 2016/3/2 21:14. + */ +public class XxlJobExecutor implements ApplicationContextAware, ApplicationListener { + private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); + + private String ip; + private int port = 9999; + private String appName; + private RegistHelper registHelper; + + public void setIp(String ip) { + this.ip = ip; + } + public void setPort(int port) { + this.port = port; + } + public void setAppName(String appName) { + this.appName = appName; + } + public void setRegistHelper(RegistHelper registHelper) { + this.registHelper = registHelper; + } + + // ---------------------------------- job server ------------------------------------ + private NetComServerFactory serverFactory = new NetComServerFactory(); + public void start() throws Exception { + NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); + serverFactory.start(port, ip, appName, registHelper); + } + public void destroy(){ + serverFactory.destroy(); + } + + // ---------------------------------- init job handler ------------------------------------ + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + + // init job handler action + Map serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHander.class); + + if (serviceBeanMap!=null && serviceBeanMap.size()>0) { + for (Object serviceBean : serviceBeanMap.values()) { + if (serviceBean instanceof IJobHandler){ + String name = serviceBean.getClass().getAnnotation(JobHander.class).value(); + IJobHandler handler = (IJobHandler) serviceBean; + registJobHandler(name, handler); + } + } + } + } + + // ---------------------------------- destory job executor ------------------------------------ + @Override + public void onApplicationEvent(ApplicationEvent applicationEvent) { + if(applicationEvent instanceof ContextClosedEvent){ + // TODO + } + } + + // ---------------------------------- job handler repository + private static ConcurrentHashMap jobHandlerRepository = new ConcurrentHashMap(); + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ + logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); + return jobHandlerRepository.put(name, jobHandler); + } + public static IJobHandler loadJobHandler(String name){ + return jobHandlerRepository.get(name); + } + + // ---------------------------------- job thread repository + private static ConcurrentHashMap JobThreadRepository = new ConcurrentHashMap(); + public static JobThread registJobThread(String jobkey, IJobHandler handler){ + JobThread jobThread = new JobThread(handler); + jobThread.start(); + logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobkey:{}, handler:{}", new Object[]{jobkey, handler}); + JobThreadRepository.put(jobkey, jobThread); // putIfAbsent | oh my god, map's put method return the old value!!! + return jobThread; + } + public static JobThread loadJobThread(String jobKey){ + return JobThreadRepository.get(jobKey); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java deleted file mode 100644 index c46cbf5c..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.xxl.job.core.executor.jetty; - -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.handler.annotation.JobHander; -import com.xxl.job.core.registry.RegistHelper; -import com.xxl.job.core.router.HandlerRouter; -import com.xxl.job.core.util.IpUtil; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.HandlerCollection; -import org.eclipse.jetty.server.nio.SelectChannelConnector; -import org.eclipse.jetty.util.thread.ExecutorThreadPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextClosedEvent; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * Created by xuxueli on 2016/3/2 21:14. - */ -public class XxlJobExecutor implements ApplicationContextAware, ApplicationListener { - private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); - - private String ip; - private int port = 9999; - private String appName; - private RegistHelper registHelper; - - public void setIp(String ip) { - this.ip = ip; - } - public void setPort(int port) { - this.port = port; - } - public void setAppName(String appName) { - this.appName = appName; - } - public void setRegistHelper(RegistHelper registHelper) { - this.registHelper = registHelper; - } - - // ---------------------------------- job server ------------------------------------ - private Server server = null; - public void start() throws Exception { - - Thread executorTnread = new Thread(new Runnable() { - @Override - public void run() { - server = new Server(); - server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞 - - // connector - SelectChannelConnector connector = new SelectChannelConnector(); - connector.setPort(port); - connector.setMaxIdleTime(30000); - server.setConnectors(new Connector[] { connector }); - - // handler - HandlerCollection handlerc =new HandlerCollection(); - handlerc.setHandlers(new Handler[]{new XxlJobExecutorHandler()}); - server.setHandler(handlerc); - - try { - server.start(); - logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port); - registryBeat(); - server.join(); // block until thread stopped - logger.info(">>>>>>>>>>>> xxl-job jetty server join success at port:{}.", port); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - executorTnread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave - executorTnread.start(); - } - - public void destroy(){ - if (server!=null) { - try { - server.stop(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private void registryBeat(){ - if (registHelper==null && appName==null || appName.trim().length()==0) { - return; - } - Thread registryThread = new Thread(new Runnable() { - @Override - public void run() { - while (true) { - try { - - // generate addredd = ip:port - String address = null; - if (ip != null && ip.trim().length()>0) { - address = ip.trim().concat(":").concat(String.valueOf(port)); - } else { - address = IpUtil.getIpPort(port); - } - - registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); - TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }); - registryThread.setDaemon(true); - registryThread.start(); - } - - // ---------------------------------- init job handler ------------------------------------ - public static ApplicationContext applicationContext; - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - XxlJobExecutor.applicationContext = applicationContext; - initJobHandler(); - } - - /** - * init job handler action - */ - public void initJobHandler(){ - Map serviceBeanMap = XxlJobExecutor.applicationContext.getBeansWithAnnotation(JobHander.class); - if (serviceBeanMap!=null && serviceBeanMap.size()>0) { - for (Object serviceBean : serviceBeanMap.values()) { - if (serviceBean instanceof IJobHandler){ - String name = serviceBean.getClass().getAnnotation(JobHander.class).value(); - IJobHandler handler = (IJobHandler) serviceBean; - HandlerRouter.registJobHandler(name, handler); - } - } - } - } - - // ---------------------------------- destory job executor ------------------------------------ - @Override - public void onApplicationEvent(ApplicationEvent applicationEvent) { - if(applicationEvent instanceof ContextClosedEvent){ - // TODO - } - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java deleted file mode 100644 index f74cd8d4..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutorHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.xxl.job.core.executor.jetty; - -import com.xxl.job.core.router.HandlerRouter; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.util.XxlJobNetCommUtil; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; - -/** - * Created by xuxueli on 2016/3/2 21:23. - */ -public class XxlJobExecutorHandler extends AbstractHandler { - private static Logger logger = LoggerFactory.getLogger(XxlJobExecutorHandler.class); - - @Override - public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException { - - httpServletRequest.setCharacterEncoding("UTF-8"); - httpServletResponse.setCharacterEncoding("UTF-8"); - - // parse hex-json to request model - String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX); - ResponseModel responseModel = null; - if (requestHex!=null && requestHex.trim().length()>0) { - try { - // route trigger - RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class); - responseModel = HandlerRouter.route(requestModel); - } catch (Exception e) { - logger.error("", e); - responseModel = new ResponseModel(ResponseModel.SUCCESS, e.getMessage()); - } - } - if (responseModel == null) { - responseModel = new ResponseModel(ResponseModel.SUCCESS, "系统异常"); - } - - // format response model to hex-json - String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel); - - // return - httpServletResponse.setContentType("text/plain;charset=utf-8"); - httpServletResponse.setStatus(HttpServletResponse.SC_OK); - baseRequest.setHandled(true); - httpServletResponse.getWriter().println(responseHex); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java deleted file mode 100644 index 078eee89..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/servlet/XxlJobServlet.java +++ /dev/null @@ -1,61 +0,0 @@ -//package com.xxl.job.client.netcom.servlet; -// -// -//import java.io.IOException; -//import java.util.HashMap; -//import java.util.Map; -// -//import javax.servlet.ServletException; -//import javax.servlet.http.HttpServlet; -//import javax.servlet.http.HttpServletRequest; -//import javax.servlet.http.HttpServletResponse; -// -//import com.xxl.job.client.handler.HandlerRouter; -// -// -///** -// * remote job client on http -// * @author xuxueli 2015-12-19 18:36:47 -// */ -//@Deprecated -//public class XxlJobServlet extends HttpServlet { -// private static final long serialVersionUID = 1L; -// -// /** -// * Default constructor. -// */ -// public XxlJobServlet() { -// // TODO Auto-generated constructor stub -// } -// -// /** -// * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response) -// */ -// protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { -// request.setCharacterEncoding("UTF-8"); -// response.setCharacterEncoding("UTF-8"); -// -// Map _param = new HashMap(); -// if (request.getParameterMap()!=null && request.getParameterMap().size()>0) { -// for (Object paramKey : request.getParameterMap().keySet()) { -// if (paramKey!=null) { -// String paramKeyStr = paramKey.toString(); -// _param.put(paramKeyStr, request.getParameter(paramKeyStr)); -// } -// } -// } -// -// String resp = HandlerRouter.action(_param); -// response.getWriter().append(resp); -// return; -// } -// -// /** -// * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse response) -// */ -// protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { -// // TODO Auto-generated method stub -// doGet(request, response); -// } -// -//} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java index 112ea04b..3098fa60 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/IJobHandler.java @@ -1,12 +1,10 @@ package com.xxl.job.core.handler; -import com.xxl.job.core.router.HandlerRouter; - /** * remote job handler * @author xuxueli 2015-12-19 19:06:38 */ -public abstract class IJobHandler extends HandlerRouter { +public abstract class IJobHandler { /** * job handler

diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java deleted file mode 100644 index fcc9e449..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/HandlerRouter.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.xxl.job.core.router; - -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.router.action.BeatAction; -import com.xxl.job.core.router.action.KillAction; -import com.xxl.job.core.router.action.LogAction; -import com.xxl.job.core.router.action.RunAction; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.router.thread.JobThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * handler repository - * @author xuxueli 2015-12-19 19:28:44 - */ -public class HandlerRouter { - private static Logger logger = LoggerFactory.getLogger(HandlerRouter.class); - - /** - * job handler repository - */ - private static ConcurrentHashMap jobHandlerRepository = new ConcurrentHashMap(); - public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ - logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); - return HandlerRouter.jobHandlerRepository.put(name, jobHandler); - } - public static IJobHandler loadJobHandler(String name){ - return HandlerRouter.jobHandlerRepository.get(name); - } - - /** - * job thread repository - */ - private static ConcurrentHashMap JobThreadRepository = new ConcurrentHashMap(); - public static JobThread registJobThread(String jobkey, IJobHandler handler){ - JobThread jobThread = new JobThread(handler); - jobThread.start(); - logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobkey:{}, handler:{}", new Object[]{jobkey, handler}); - HandlerRouter.JobThreadRepository.put(jobkey, jobThread); // putIfAbsent | oh my god, map's put method return the old value!!! - return jobThread; - } - public static JobThread loadJobThread(String jobKey){ - return HandlerRouter.JobThreadRepository.get(jobKey); - } - - /** - * route action repository - */ - public enum ActionRepository { - RUN(new RunAction()), - KILL(new KillAction()), - LOG(new LogAction()), - BEAT(new BeatAction()); - - private IAction action; - private ActionRepository(IAction action){ - this.action = action; - } - - /** - * match Action by enum name - * @param name - * @return action - */ - public static IAction matchAction(String name){ - if (name!=null && name.trim().length()>0) { - for (ActionRepository item : ActionRepository.values()) { - if (item.name().equals(name)) { - return item.action; - } - } - } - return null; - } - - } - - // handler push to queue - public static ResponseModel route(RequestModel requestModel) { - logger.debug(">>>>>>>>>>> xxl-job route, RequestModel:{}", new Object[]{requestModel.toString()}); - - // timestamp check - if (System.currentTimeMillis() - requestModel.getTimestamp() > 60000) { - return new ResponseModel(ResponseModel.FAIL, "Timestamp Timeout."); - } - - // match action - IAction action = ActionRepository.matchAction(requestModel.getAction()); - if (action == null) { - return new ResponseModel(ResponseModel.FAIL, "Action match fail."); - } - - return action.execute(requestModel); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java deleted file mode 100644 index 3de0469b..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/IAction.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.xxl.job.core.router; - -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; - -/** - * Created by xuxueli on 16/7/22. - */ -public abstract class IAction { - - public abstract ResponseModel execute(RequestModel requestModel); - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java deleted file mode 100644 index 3bc33c9b..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/BeatAction.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.xxl.job.core.router.action; - -import com.xxl.job.core.router.IAction; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; - -/** - * Created by xuxueli on 16/7/22. - */ -public class BeatAction extends IAction { - - @Override - public ResponseModel execute(RequestModel requestModel) { - return new ResponseModel(ResponseModel.SUCCESS, "i am alive."); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java deleted file mode 100644 index f089ac49..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/KillAction.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.xxl.job.core.router.action; - -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.router.HandlerRouter; -import com.xxl.job.core.router.IAction; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.router.thread.JobThread; - -/** - * Created by xuxueli on 16/7/22. - */ -public class KillAction extends IAction { - - @Override - public ResponseModel execute(RequestModel requestModel) { - - // generate jobKey - String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName()); - - // kill handlerThread, and create new one - JobThread jobThread = HandlerRouter.loadJobThread(jobKey); - - if (jobThread != null) { - IJobHandler handler = jobThread.getHandler(); - jobThread.toStop("人工手动终止"); - jobThread.interrupt(); - HandlerRouter.registJobThread(jobKey, handler); - return new ResponseModel(ResponseModel.SUCCESS, "job thread kill success."); - } - - return new ResponseModel(ResponseModel.FAIL, "job thread not found."); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java deleted file mode 100644 index 3cbd54ca..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/LogAction.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.xxl.job.core.router.action; - -import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.job.core.router.IAction; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; - -import java.util.Date; - -/** - * Created by xuxueli on 16/7/22. - */ -public class LogAction extends IAction { - - @Override - public ResponseModel execute(RequestModel requestModel) { - // log filename: yyyy-MM-dd/9999.log - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(requestModel.getLogDateTim()), requestModel.getLogId()); - - String logConteng = XxlJobFileAppender.readLog(logFileName); - return new ResponseModel(ResponseModel.SUCCESS, logConteng); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java deleted file mode 100644 index d1c44e5d..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/action/RunAction.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.xxl.job.core.router.action; - -import com.xxl.job.core.glue.GlueFactory; -import com.xxl.job.core.handler.IJobHandler; -import com.xxl.job.core.handler.impl.GlueJobHandler; -import com.xxl.job.core.router.HandlerRouter; -import com.xxl.job.core.router.IAction; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.router.thread.JobThread; - -/** - * Created by xuxueli on 16/7/22. - */ -public class RunAction extends IAction { - - @Override - public ResponseModel execute(RequestModel requestModel) { - - // generate jobKey - String jobKey = requestModel.getJobGroup().concat("_").concat(requestModel.getJobName()); - - // load old thread - JobThread jobThread = HandlerRouter.loadJobThread(jobKey); - - if (!requestModel.isGlueSwitch()) { - // bean model - - // valid handler instance - IJobHandler jobHandler = HandlerRouter.loadJobHandler(requestModel.getExecutorHandler()); - if (jobHandler==null) { - return new ResponseModel(ResponseModel.FAIL, "job handler for jobKey=[" + jobKey + "] not found."); - } - - if (jobThread == null) { - jobThread = HandlerRouter.registJobThread(jobKey, jobHandler); - } else { - // job handler update, kill old job thread - if (jobThread.getHandler() != jobHandler) { - // kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - - // new thread, with new job handler - jobThread = HandlerRouter.registJobThread(jobKey, jobHandler); - } - } - } else { - // glue model - - // valid glueloader - if (!GlueFactory.isActive()) { - return new ResponseModel(ResponseModel.FAIL, "glueLoader for jobKey=[" + jobKey + "] not found."); - } - - if (jobThread == null) { - jobThread = HandlerRouter.registJobThread(jobKey, new GlueJobHandler(requestModel.getJobGroup(), requestModel.getJobName())); - } else { - // job handler update, kill old job thread - if (!(jobThread.getHandler() instanceof GlueJobHandler)) { - // kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - - // new thread, with new job handler - jobThread = HandlerRouter.registJobThread(jobKey, new GlueJobHandler(requestModel.getJobGroup(), requestModel.getJobName())); - } - } - } - - // push data to queue - jobThread.pushTriggerQueue(requestModel); - return new ResponseModel(ResponseModel.SUCCESS, null); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java b/xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java deleted file mode 100644 index ab202735..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/model/ResponseModel.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.xxl.job.core.router.model; - -/** - * Created by xuxueli on 16/7/22. - */ -public class ResponseModel { - public static final String SUCCESS = "SUCCESS"; - public static final String FAIL = "FAIL"; - - private String status; - private String msg; - - public ResponseModel() { - } - - public ResponseModel(String status, String msg) { - this.status = status; - this.msg = msg; - } - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public String getMsg() { - return msg; - } - - public void setMsg(String msg) { - this.msg = msg; - } - - @Override - public String toString() { - return "ResponseModel{" + - "status='" + status + '\'' + - ", msg='" + msg + '\'' + - '}'; - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java new file mode 100644 index 00000000..05ce00bb --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java @@ -0,0 +1,69 @@ +package com.xxl.job.core.rpc.codec; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * request + * @author xuxueli 2015-10-29 19:39:12 + */ +public class RpcRequest implements Serializable{ + private static final long serialVersionUID = 1L; + + private String serverAddress; + private long createMillisTime; + + private String className; + private String methodName; + private Class[] parameterTypes; + private Object[] parameters; + + public String getServerAddress() { + return serverAddress; + } + + public void setServerAddress(String serverAddress) { + this.serverAddress = serverAddress; + } + + public long getCreateMillisTime() { + return createMillisTime; + } + public void setCreateMillisTime(long createMillisTime) { + this.createMillisTime = createMillisTime; + } + public String getClassName() { + return className; + } + public void setClassName(String className) { + this.className = className; + } + public String getMethodName() { + return methodName; + } + public void setMethodName(String methodName) { + this.methodName = methodName; + } + public Class[] getParameterTypes() { + return parameterTypes; + } + public void setParameterTypes(Class[] parameterTypes) { + this.parameterTypes = parameterTypes; + } + public Object[] getParameters() { + return parameters; + } + public void setParameters(Object[] parameters) { + this.parameters = parameters; + } + + @Override + public String toString() { + return "NettyRequest [serverAddress=" + serverAddress + ", createMillisTime=" + + createMillisTime + ", className=" + className + + ", methodName=" + methodName + ", parameterTypes=" + + Arrays.toString(parameterTypes) + ", parameters=" + + Arrays.toString(parameters) + "]"; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java new file mode 100644 index 00000000..bb95be32 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java @@ -0,0 +1,41 @@ +package com.xxl.job.core.rpc.codec; + +import java.io.Serializable; + +/** + * response + * @author xuxueli 2015-10-29 19:39:54 + */ +public class RpcResponse implements Serializable{ + private static final long serialVersionUID = 1L; + + private String error; + private Object result; + + public boolean isError() { + return error != null; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public Object getResult() { + return result; + } + + public void setResult(Object result) { + this.result = result; + } + + @Override + public String toString() { + return "NettyResponse [error=" + error + + ", result=" + result + "]"; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java new file mode 100644 index 00000000..62f9a6f1 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java @@ -0,0 +1,73 @@ +package com.xxl.job.core.rpc.netcom; + +import com.xxl.job.core.rpc.codec.RpcRequest; +import com.xxl.job.core.rpc.codec.RpcResponse; +import com.xxl.job.core.rpc.netcom.jetty.client.JettyClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.FactoryBean; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * rpc proxy + * @author xuxueli 2015-10-29 20:18:32 + */ +public class NetComClientProxy implements FactoryBean { + private static final Logger logger = LoggerFactory.getLogger(NetComClientProxy.class); + + // ---------------------- config ---------------------- + private Class iface; + String serverAddress; + JettyClient client = new JettyClient(); + public NetComClientProxy(Class iface, String serverAddress) { + this.iface = iface; + this.serverAddress = serverAddress; + } + + @Override + public Object getObject() throws Exception { + return Proxy.newProxyInstance(Thread.currentThread() + .getContextClassLoader(), new Class[] { iface }, + new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + // request + RpcRequest request = new RpcRequest(); + request.setServerAddress(serverAddress); + request.setCreateMillisTime(System.currentTimeMillis()); + request.setClassName(method.getDeclaringClass().getName()); + request.setMethodName(method.getName()); + request.setParameterTypes(method.getParameterTypes()); + request.setParameters(args); + + // send + RpcResponse response = client.send(request); + + // valid response + if (response == null) { + logger.error(">>>>>>>>>>> xxl-rpc netty response not found."); + throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found."); + } + if (response.isError()) { + throw new RuntimeException(response.getError()); + } else { + return response.getResult(); + } + + } + }); + } + @Override + public Class getObjectType() { + return iface; + } + @Override + public boolean isSingleton() { + return false; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java new file mode 100644 index 00000000..9f0aa95f --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java @@ -0,0 +1,77 @@ +package com.xxl.job.core.rpc.netcom; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.rpc.codec.RpcRequest; +import com.xxl.job.core.rpc.codec.RpcResponse; +import com.xxl.job.core.rpc.netcom.jetty.server.JettyServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cglib.reflect.FastClass; +import org.springframework.cglib.reflect.FastMethod; + +import java.util.HashMap; +import java.util.Map; + +/** + * netcom init + * @author xuxueli 2015-10-31 22:54:27 + */ +public class NetComServerFactory { + private static final Logger logger = LoggerFactory.getLogger(NetComServerFactory.class); + + // ---------------------- server start ---------------------- + JettyServer server = new JettyServer(); + public void start(int port, String ip, String appName, RegistHelper registHelper) throws Exception { + server.start(port, ip, appName, registHelper); + } + + // ---------------------- server destroy ---------------------- + public void destroy(){ + server.destroy(); + } + + // ---------------------- server init ---------------------- + /** + * init local rpc service map + */ + private static Map serviceMap = new HashMap(); + public static void putService(Class iface, Object serviceBean){ + serviceMap.put(iface.getName(), serviceBean); + } + public static RpcResponse invokeService(RpcRequest request, Object serviceBean) { + if (serviceBean==null) { + serviceBean = serviceMap.get(request.getClassName()); + } + if (serviceBean == null) { + // TODO + } + + RpcResponse response = new RpcResponse(); + + if (System.currentTimeMillis() - request.getCreateMillisTime() > 60000) { + response.setResult(new ReturnT(ReturnT.FAIL_CODE, "Timestamp Timeout.")); + return response; + } + + try { + Class serviceClass = serviceBean.getClass(); + String methodName = request.getMethodName(); + Class[] parameterTypes = request.getParameterTypes(); + Object[] parameters = request.getParameters(); + + FastClass serviceFastClass = FastClass.create(serviceClass); + FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); + + Object result = serviceFastMethod.invoke(serviceBean, parameters); + + response.setResult(result); + } catch (Throwable t) { + t.printStackTrace(); + response.setError(t.getMessage()); + } + + return response; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java new file mode 100644 index 00000000..b0cecec5 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java @@ -0,0 +1,20 @@ +package com.xxl.job.core.rpc.netcom.jetty.client; + +import com.xxl.job.core.rpc.codec.RpcRequest; +import com.xxl.job.core.rpc.codec.RpcResponse; +import com.xxl.job.core.rpc.serialize.HessianSerializer; +import com.xxl.job.core.util.HttpClientUtil; + +/** + * jetty client + * @author xuxueli 2015-11-24 22:25:15 + */ +public class JettyClient { + + public RpcResponse send(RpcRequest request) throws Exception { + byte[] requestBytes = HessianSerializer.serialize(request); + byte[] responseBytes = HttpClientUtil.postRequest("http://" + request.getServerAddress() + "/", requestBytes); + return (RpcResponse) HessianSerializer.deserialize(responseBytes, RpcResponse.class); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java new file mode 100644 index 00000000..25e1ddee --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java @@ -0,0 +1,107 @@ +package com.xxl.job.core.rpc.netcom.jetty.server; + +import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.util.IpUtil; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * rpc jetty server + * @author xuxueli 2015-11-19 22:29:03 + */ +public class JettyServer { + private static final Logger logger = LoggerFactory.getLogger(JettyServer.class); + + private Server server; + + public void start(final int port, final String ip, final String appName, final RegistHelper registHelper) throws Exception { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + server = new Server(); + server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞 + + // connector + SelectChannelConnector connector = new SelectChannelConnector(); + connector.setPort(port); + connector.setMaxIdleTime(30000); + server.setConnectors(new Connector[] { connector }); + + // handler + HandlerCollection handlerc =new HandlerCollection(); + handlerc.setHandlers(new Handler[]{new JettyServerHandler()}); + server.setHandler(handlerc); + + try { + server.start(); + logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port); + executorRegistryBeat(port, ip, appName, registHelper); + server.join(); // block until thread stopped + logger.info(">>>>>>>>>>> xxl-rpc server start success, netcon={}, port={}", JettyServer.class.getName(), port); + } catch (Exception e) { + logger.error("", e); + } finally { + server.destroy(); + } + } + }); + thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave + thread.start(); + } + + public void destroy() { + if (server != null) { + try { + server.destroy(); + } catch (Exception e) { + logger.error("", e); + } + } + logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName()); + } + + /** + * registry beat + * @param port + * @param ip + * @param appName + * @param registHelper + */ + private void executorRegistryBeat(final int port, final String ip, final String appName, final RegistHelper registHelper){ + if (registHelper==null && appName==null || appName.trim().length()==0) { + return; + } + Thread registryThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + // generate addredd = ip:port + String address = null; + if (ip != null && ip.trim().length()>0) { + address = ip.trim().concat(":").concat(String.valueOf(port)); + } else { + address = IpUtil.getIpPort(port); + } + + registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); + TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }); + registryThread.setDaemon(true); + registryThread.start(); + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServerHandler.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServerHandler.java new file mode 100644 index 00000000..2dbc3ba6 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServerHandler.java @@ -0,0 +1,47 @@ +package com.xxl.job.core.rpc.netcom.jetty.server; + +import com.xxl.job.core.rpc.codec.RpcRequest; +import com.xxl.job.core.rpc.codec.RpcResponse; +import com.xxl.job.core.rpc.netcom.NetComServerFactory; +import com.xxl.job.core.rpc.serialize.HessianSerializer; +import com.xxl.job.core.util.HttpClientUtil; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.OutputStream; + +/** + * jetty handler + * @author xuxueli 2015-11-19 22:32:36 + */ +public class JettyServerHandler extends AbstractHandler { + + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + + // deserialize request + byte[] requestBytes = HttpClientUtil.readBytes(request); + RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class); + + // invoke + RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null); + + // serialize response + byte[] responseBytes = HessianSerializer.serialize(rpcResponse); + + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + baseRequest.setHandled(true); + + OutputStream out = response.getOutputStream(); + out.write(responseBytes); + out.flush(); + + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/serialize/HessianSerializer.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/serialize/HessianSerializer.java new file mode 100644 index 00000000..eda07ff4 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/serialize/HessianSerializer.java @@ -0,0 +1,37 @@ +package com.xxl.job.core.rpc.serialize; + +import com.caucho.hessian.io.HessianInput; +import com.caucho.hessian.io.HessianOutput; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * hessian serialize + * @author xuxueli 2015-9-26 02:53:29 + */ +public class HessianSerializer { + + public static byte[] serialize(T obj){ + ByteArrayOutputStream os = new ByteArrayOutputStream(); + HessianOutput ho = new HessianOutput(os); + try { + ho.writeObject(obj); + } catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + return os.toByteArray(); + } + + public static Object deserialize(byte[] bytes, Class clazz) { + ByteArrayInputStream is = new ByteArrayInputStream(bytes); + HessianInput hi = new HessianInput(is); + try { + return hi.readObject(); + } catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java similarity index 60% rename from xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java rename to xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index 419854c1..4fda157d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -1,9 +1,9 @@ -package com.xxl.job.core.router.thread; +package com.xxl.job.core.thread; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +23,7 @@ public class JobThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(JobThread.class); private IJobHandler handler; - private LinkedBlockingQueue triggerQueue; + private LinkedBlockingQueue triggerQueue; private ConcurrentHashSet triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID private boolean toStop = false; @@ -31,21 +31,21 @@ public class JobThread extends Thread{ public JobThread(IJobHandler handler) { this.handler = handler; - triggerQueue = new LinkedBlockingQueue(); + triggerQueue = new LinkedBlockingQueue(); triggerLogIdSet = new ConcurrentHashSet(); } public IJobHandler getHandler() { return handler; } - public void pushTriggerQueue(RequestModel requestModel) { - if (triggerLogIdSet.contains(requestModel.getLogId())) { - logger.debug("repeate trigger job, logId:{}", requestModel.getLogId()); + public void pushTriggerQueue(TriggerParam triggerParam) { + if (triggerLogIdSet.contains(triggerParam.getLogId())) { + logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId()); return; } - triggerLogIdSet.add(requestModel.getLogId()); - triggerQueue.add(requestModel); + triggerLogIdSet.add(triggerParam.getLogId()); + triggerQueue.add(triggerParam); } public void toStop(String stopReason) { @@ -64,46 +64,46 @@ public class JobThread extends Thread{ while(!toStop){ try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) - RequestModel triggerDate = triggerQueue.poll(3L, TimeUnit.SECONDS); - if (triggerDate!=null) { - triggerLogIdSet.remove(triggerDate.getLogId()); + TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerParam!=null) { + triggerLogIdSet.remove(triggerParam.getLogId()); // parse param - String[] handlerParams = (triggerDate.getExecutorParams()!=null && triggerDate.getExecutorParams().trim().length()>0) - ? (String[])(Arrays.asList(triggerDate.getExecutorParams().split(",")).toArray()) : null; + String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) + ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; // handle job - String _status = ResponseModel.SUCCESS; + int _code = ReturnT.SUCCESS_CODE; String _msg = null; try { // log filename: yyyy-MM-dd/9999.log - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerDate.getLogDateTim()), triggerDate.getLogId()); + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); logger.info("----------- xxl-job job execute start -----------"); handler.execute(handlerParams); } catch (Exception e) { logger.error("JobThread Exception:", e); - _status = ResponseModel.FAIL; + _code = ReturnT.FAIL_CODE; StringWriter out = new StringWriter(); e.printStackTrace(new PrintWriter(out)); _msg = out.toString(); } - logger.info("----------- xxl-job job execute end -----------
Look : ExecutorParams:{}, Status:{}, Msg:{}", - new Object[]{handlerParams, _status, _msg}); + logger.info("----------- xxl-job job execute end -----------
Look : ExecutorParams:{}, Code:{}, Msg:{}", + new Object[]{handlerParams, _code, _msg}); // callback handler info if (!toStop) { // commonm - triggerDate.setStatus(_status); - triggerDate.setMsg(_msg); - TriggerCallbackThread.pushCallBack(triggerDate); + triggerParam.setStatus(_code+""); + triggerParam.setMsg(_msg); + TriggerCallbackThread.pushCallBack(triggerParam); } else { // is killed - triggerDate.setStatus(ResponseModel.FAIL); - triggerDate.setMsg(stopReason + " [业务运行中,被强制终止]"); - TriggerCallbackThread.pushCallBack(triggerDate); + triggerParam.setStatus(ReturnT.FAIL_CODE+""); + triggerParam.setMsg(stopReason + " [业务运行中,被强制终止]"); + TriggerCallbackThread.pushCallBack(triggerParam); } } } catch (Exception e) { @@ -113,12 +113,12 @@ public class JobThread extends Thread{ // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ - RequestModel triggerDate = triggerQueue.poll(); - if (triggerDate!=null) { + TriggerParam triggerParam = triggerQueue.poll(); + if (triggerParam!=null) { // is killed - triggerDate.setStatus(ResponseModel.FAIL); - triggerDate.setMsg(stopReason + " [任务尚未执行,在调度队列中被终止]"); - TriggerCallbackThread.pushCallBack(triggerDate); + triggerParam.setStatus(ReturnT.FAIL_CODE+""); + triggerParam.setMsg(stopReason + " [任务尚未执行,在调度队列中被终止]"); + TriggerCallbackThread.pushCallBack(triggerParam); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java similarity index 58% rename from xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java rename to xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 1bf5396d..8a37f939 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -1,8 +1,9 @@ -package com.xxl.job.core.router.thread; +package com.xxl.job.core.thread; -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import com.xxl.job.core.util.XxlJobNetCommUtil; +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.rpc.netcom.NetComClientProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,20 +15,24 @@ import java.util.concurrent.LinkedBlockingQueue; public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); - private static LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); + private static LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); static { new Thread(new Runnable() { @Override public void run() { while(true){ try { - RequestModel callback = callBackQueue.take(); + TriggerParam callback = callBackQueue.take(); if (callback != null) { for (String address : callback.getLogAddress()) { try { - ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), callback); - logger.info(">>>>>>>>>>> xxl-job callback , RequestModel:{}, ResponseModel:{}", new Object[]{callback.toString(), responseModel.toString()}); - if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) { + + // callback + AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, address).getObject(); + ReturnT callbackResult = adminBiz.callback(callback); + + logger.info(">>>>>>>>>>> xxl-job callback , CallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()}); + if (ReturnT.SUCCESS_CODE == callbackResult.getCode()) { break; } } catch (Exception e) { @@ -42,7 +47,7 @@ public class TriggerCallbackThread { } }).start(); } - public static void pushCallBack(RequestModel callback){ + public static void pushCallBack(TriggerParam callback){ callBackQueue.add(callback); logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java deleted file mode 100644 index 02581330..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteHexConverter.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.xxl.job.core.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.math.BigInteger; - -/** - * hex/byte util - * @author xuxueli 2015-11-14 22:47:28 - */ -public class ByteHexConverter { - private static Logger logger = LoggerFactory.getLogger(ByteHexConverter.class); - - /** - * byte - to - radix, use BigInteger - */ - private static final String hex_tables = "0123456789ABCDEF"; - public static String byte2hex (byte[] iBytes) { - StringBuilder hex = new StringBuilder(iBytes.length * 2); - for (int index = 0; index < iBytes.length; index++) { - hex.append(hex_tables.charAt((iBytes[index] & 0xf0) >> 4)); - hex.append(hex_tables.charAt((iBytes[index] & 0x0f) >> 0)); - } - return hex.toString(); - } - public static byte[] hex2Byte(String hexString) { - if (hexString == null || hexString.equals("")) { - return null; - } - byte[] res = new byte[hexString.length() / 2]; - char[] chs = hexString.toCharArray(); - for (int i = 0, c = 0; i < chs.length; i += 2, c++) { - res[c] = (byte) (Integer.parseInt(new String(chs, i, 2), 16)); - } - return res; - } - - /** - * byte - to - radix, use BigInteger - */ - public static final int HEX = 16; - public static String byte2radix(byte[] iBytes, int radix){ - return new BigInteger(1, iBytes).toString(radix); - } - public static byte[] radix2byte(String val, int radix){ - return new BigInteger(val, radix).toByteArray(); - } - - /** - * get length of string - * @param str - * @return len of string byte - */ - public static int getByteLen(String str){ - if (str==null || str.length()==0) { - return 0; - } - // because java base on unicode, and one china code's length is one, but it's cost 2 bytes. - //int len = str.getBytes().length * 2; - int len = 0; - try { - len = str.getBytes("UTF-8").length; - } catch (UnsupportedEncodingException e) { - logger.error("", e); - len = str.getBytes().length * 2; - } - - if (len % 4 != 0) { - // Length is best in multiples of four - len = (len/4 + 1) * 4; - } - return len; - } - - public static void main(String[] args) { - // hex - byte[] 方案A:位移 - String temp = "1111111111113d1f3a51sd3f1a32sd1f32as1df2a13sd21f3a2s1df32a13sd2f123s2a3d13fa13sd9999999999"; - System.out.println("明文:" + new String(temp.getBytes())); - System.out.println("编码:" + byte2hex(temp.getBytes())); - System.out.println("解码:" + new String(hex2Byte(byte2hex(temp.getBytes())))); - - // hex - byte[] 方案B:BigInteger - System.out.println("编码:" + byte2radix(temp.getBytes(), HEX)); - System.out.println("解码:" + new String(radix2byte(byte2radix(temp.getBytes(), HEX), HEX))); - - } - -} \ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteReadFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteReadFactory.java deleted file mode 100644 index f136246b..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteReadFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.xxl.job.core.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; - -/** - * byte read util - * @author xuxueli 2015-11-15 03:50:10 - */ -public class ByteReadFactory { - private static transient Logger logger = LoggerFactory.getLogger(ByteReadFactory.class); - private int m_iPos; - private int m_iReqLen; - private byte[] m_byte = null; - - public ByteReadFactory(byte[] hexBytes){ - m_iPos = 0; - m_byte = hexBytes; - m_iReqLen = m_byte.length; - } - - public int readInt() { - if (m_iPos + 4 > m_iReqLen) { - return 0; - } - int iInt = (m_byte[m_iPos] & 0xff) - | ((m_byte[m_iPos + 1] & 0xff) << 8) - | ((m_byte[m_iPos + 2] & 0xff) << 16) - | ((m_byte[m_iPos + 3] & 0xff) << 24); - m_iPos += 4; - return iInt; - } - - public long readLong() { - if (m_iPos + 8 > m_iReqLen) { - return 0; - } - long iLong = (m_byte[m_iPos] & 0xff) - | ((m_byte[m_iPos + 1] & 0xff) << 8) - | ((m_byte[m_iPos + 2] & 0xff) << 16) - | ((m_byte[m_iPos + 3] & 0xff) << 24) - | ((m_byte[m_iPos + 4] & 0xff) << 32) - | ((m_byte[m_iPos + 5] & 0xff) << 40) - | ((m_byte[m_iPos + 6] & 0xff) << 48) - | ((m_byte[m_iPos + 7] & 0xff) << 56); - m_iPos += 8; - return iLong; - } - - public String readString(int length) { - if (m_iPos + length > m_iReqLen) { - logger.error("[byte stream factory read string length error.]"); - return ""; - } - - int index = 0; - for (index = 0; index < length; index++) { - if (m_byte[m_iPos + index] == 0) { - break; - } - } - String msg = ""; - try { - msg = new String(m_byte, m_iPos, index, "UTF-8"); - } catch (UnsupportedEncodingException e) { - logger.error("[byte stream factory read string exception.]", e); - } - m_iPos += length; - - return msg; - } - - public byte[] read(int length) { - if (m_iPos + length > m_iReqLen || length<=0) { - logger.error("[byte stream factory read string length error.]"); - return null; - } - for (int i = 0; i < length; i++) { - if (m_byte[m_iPos + i] == 0) { - break; - } - } - - byte[] result = new byte[length]; - for (int i = 0; i < length; i++) { - result[i] = m_byte[m_iPos + i]; - } - m_iPos += length; - return result; - } - - public byte[] readByteAll() { - return read(m_iReqLen - m_iPos); - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteWriteFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteWriteFactory.java deleted file mode 100644 index 36051294..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/ByteWriteFactory.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.xxl.job.core.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; - -/** - * byte write util - * @author xuxueli 2015-11-15 03:49:36 - */ -public class ByteWriteFactory { - private static transient Logger logger = LoggerFactory.getLogger(ByteWriteFactory.class); - private ByteBuffer m_byteBuf = null; - public ByteWriteFactory() { - m_byteBuf = ByteBuffer.allocate(1024 * 4); - } - public ByteWriteFactory(int capacity) { - m_byteBuf = ByteBuffer.allocate(capacity); - } - - public void writeInt(int intValue) { - byte[] intBytes = new byte[4]; - for (int index = 0; index < 4; index++) { - intBytes[index] = (byte) (intValue >>> (index * 8)); - } - m_byteBuf.put(intBytes); - } - - public void write(int[] intArr) { - for (int index = 0; index < intArr.length; index++) { - writeInt(intArr[index]); - } - } - - public void write(byte[] byteArr) { - m_byteBuf.put(byteArr); - } - - public void writeString(String value, int length) { - byte[] bytes = new byte[length]; - if (value != null && value.trim().length() > 0) { - try { - byte[] infoBytes = value.getBytes("UTF-8"); - int len = infoBytes.length < length ? infoBytes.length : length; - System.arraycopy(infoBytes, 0, bytes, 0, len); - } catch (UnsupportedEncodingException e) { - logger.error("[response stream factory encoding exception.]", e); - } - } - m_byteBuf.put(bytes); - } - - public byte[] getBytes() { - m_byteBuf.flip(); - if (m_byteBuf.limit() == 0) { - return null; - } - - byte[] bytes = new byte[m_byteBuf.limit()]; - m_byteBuf.get(bytes); - - return bytes; - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java new file mode 100644 index 00000000..1c8e3608 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java @@ -0,0 +1,92 @@ +package com.xxl.job.core.util; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.io.InputStream; + +/** + * httpclient util + * @author xuxueli 2015-10-31 19:50:41 + */ +public class HttpClientUtil { + + /** + * post request + */ + public static byte[] postRequest(String reqURL, byte[] date) { + byte[] responseBytes = null; + + HttpPost httpPost = new HttpPost(reqURL); + CloseableHttpClient httpClient = HttpClients.createDefault(); + try { + // init post + /*if (params != null && !params.isEmpty()) { + List formParams = new ArrayList(); + for (Map.Entry entry : params.entrySet()) { + formParams.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); + } + httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8")); + }*/ + if (date != null) { + httpPost.setEntity(new ByteArrayEntity(date, ContentType.DEFAULT_BINARY)); + } + // do post + HttpResponse response = httpClient.execute(httpPost); + HttpEntity entity = response.getEntity(); + if (null != entity) { + responseBytes = EntityUtils.toByteArray(entity); + EntityUtils.consume(entity); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + httpPost.releaseConnection(); + try { + httpClient.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + return responseBytes; + } + + /** + * read bytes from http request + * @param request + * @return + * @throws IOException + */ + public static final byte[] readBytes(HttpServletRequest request) throws IOException { + request.setCharacterEncoding("UTF-8"); + int contentLen = request.getContentLength(); + InputStream is = request.getInputStream(); + if (contentLen > 0) { + int readLen = 0; + int readLengthThisTime = 0; + byte[] message = new byte[contentLen]; + try { + while (readLen != contentLen) { + readLengthThisTime = is.read(message, readLen, contentLen - readLen); + if (readLengthThisTime == -1) { + break; + } + readLen += readLengthThisTime; + } + return message; + } catch (IOException e) { + e.printStackTrace(); + } + } + return new byte[] {}; + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java deleted file mode 100644 index ee390f7e..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/XxlJobNetCommUtil.java +++ /dev/null @@ -1,170 +0,0 @@ -package com.xxl.job.core.util; - -import com.xxl.job.core.router.model.RequestModel; -import com.xxl.job.core.router.model.ResponseModel; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.NameValuePair; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * http util to send data - * @author xuxueli - * @version 2015-11-28 15:30:59 - */ -public class XxlJobNetCommUtil { - private static Logger logger = LoggerFactory.getLogger(XxlJobNetCommUtil.class); - - // hex param key - public static final String HEX = "hex"; - - - /** - * format object to hex-json - * @param obj - * @return result - */ - public static String formatObj2HexJson(Object obj){ - // obj to json - String json = JacksonUtil.writeValueAsString(obj); - int len = ByteHexConverter.getByteLen(json); - - // json to byte[] - ByteWriteFactory byteWriteFactory = new ByteWriteFactory(4 + len); - byteWriteFactory.writeInt(len); - byteWriteFactory.writeString(json, len); - byte[] bytes = byteWriteFactory.getBytes(); - - // byte to hex - String hex = ByteHexConverter.byte2hex(bytes); - return hex; - } - - /** - * parse hex-json to object - * @param hex - * @param clazz - * @return result - */ - public static T parseHexJson2Obj(String hex, Class clazz){ - // hex to byte[] - byte[] bytes = ByteHexConverter.hex2Byte(hex); - - // byte[] to json - ByteReadFactory byteReadFactory = new ByteReadFactory(bytes); - String json = byteReadFactory.readString(byteReadFactory.readInt()); - - // json to obj - T obj = JacksonUtil.readValue(json, clazz); - return obj; - } - - public static void main(String[] args) { - RequestModel requestModel = new RequestModel(); - requestModel.setJobGroup("group"); - - String hex = formatObj2HexJson(requestModel); - System.out.println(hex); - System.out.println(parseHexJson2Obj(hex, RequestModel.class)); - } - - /** - * http post request - * @param reqURL - */ - public static ResponseModel postHex(String reqURL, RequestModel requestModel){ - - // parse RequestModel to hex-json - String requestHex = XxlJobNetCommUtil.formatObj2HexJson(requestModel); - - // msg - String failMsg = null; - - // do post - HttpPost httpPost = null; - CloseableHttpClient httpClient = null; - try{ - httpPost = new HttpPost(reqURL); - List formParams = new ArrayList(); - formParams.add(new BasicNameValuePair(XxlJobNetCommUtil.HEX, requestHex)); - httpPost.setEntity(new UrlEncodedFormEntity(formParams, "UTF-8")); - - - RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build(); - httpPost.setConfig(requestConfig); - - //httpClient = HttpClients.createDefault(); // default retry 3 times - httpClient = HttpClients.custom().disableAutomaticRetries().build(); - - HttpResponse response = httpClient.execute(httpPost); - HttpEntity entity = response.getEntity(); - if (response.getStatusLine().getStatusCode() == 200 && null != entity) { - String responseHex = EntityUtils.toString(entity, "UTF-8"); - logger.debug("xxl-job, net comm success, requestHex:{}, responseHex:{}", requestHex, responseHex); - EntityUtils.consume(entity); - - // i do not know why - //responseHex = responseHex.replace("\n", ""); - //responseHex = responseHex.replace("\r", ""); - - if (responseHex!=null) { - responseHex = responseHex.trim(); - } - - // parse hex-json to ResponseModel - ResponseModel responseModel = XxlJobNetCommUtil.parseHexJson2Obj(responseHex, ResponseModel.class); - - if (responseModel!=null) { - return responseModel; - } - } else { - failMsg = "http statusCode error, statusCode:" + response.getStatusLine().getStatusCode(); - } - } catch (Exception e) { - logger.error("", e); - /*StringWriter out = new StringWriter(); - e.printStackTrace(new PrintWriter(out)); - callback.setMsg(out.toString());*/ - failMsg = e.getMessage(); - } finally{ - if (httpPost!=null) { - httpPost.releaseConnection(); - } - if (httpClient!=null) { - try { - httpClient.close(); - } catch (IOException e) { - logger.error("", e); - } - } - } - - // other, default fail - ResponseModel callback = new ResponseModel(); - callback.setStatus(ResponseModel.FAIL); - callback.setMsg(failMsg); - return callback; - } - - /** - * parse address ip:port to url http://.../ - * @param address - * @return result - */ - public static String addressToUrl(String address){ - return "http://" + address + "/"; - } - -} diff --git a/xxl-job-executor-example/pom.xml b/xxl-job-executor-example/pom.xml index 55a62673..1aa3200a 100644 --- a/xxl-job-executor-example/pom.xml +++ b/xxl-job-executor-example/pom.xml @@ -4,13 +4,13 @@ com.xuxueli xxl-job - 1.6.0 + 1.6.0-SNAPSHOT xxl-job-executor-example war - 1.6.0 + 1.6.0-SNAPSHOT 3.2.17.RELEASE diff --git a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml index eaf8887b..4ac7b58c 100644 --- a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml +++ b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml @@ -13,7 +13,7 @@ - +