diff --git a/pom.xml b/pom.xml
index 6f78b9c0..8f944413 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.xuxueli
xxl-job
- 1.6.0
+ 1.6.0-SNAPSHOT
pom
xxl-job
diff --git a/xxl-job-admin/pom.xml b/xxl-job-admin/pom.xml
index d5c2d251..4561b88d 100644
--- a/xxl-job-admin/pom.xml
+++ b/xxl-job-admin/pom.xml
@@ -4,13 +4,13 @@
com.xuxueli
xxl-job
- 1.6.0
+ 1.6.0-SNAPSHOT
xxl-job-admin
war
- 1.6.0
+ 1.6.0-SNAPSHOT
3.2.17.RELEASE
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java
index 464ff10d..a4923e8f 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/IndexController.java
@@ -1,8 +1,9 @@
package com.xxl.job.admin.controller;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
+import com.xxl.job.admin.controller.annotation.PermessionLimit;
+import com.xxl.job.admin.controller.interceptor.PermissionInterceptor;
+import com.xxl.job.admin.core.util.PropertiesUtil;
+import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
@@ -10,10 +11,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
-import com.xxl.job.admin.controller.annotation.PermessionLimit;
-import com.xxl.job.admin.controller.interceptor.PermissionInterceptor;
-import com.xxl.job.admin.core.model.ReturnT;
-import com.xxl.job.admin.core.util.PropertiesUtil;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
/**
* index controller
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java
index 4f91fb6e..7a1acaff 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobCodeController.java
@@ -1,10 +1,10 @@
package com.xxl.job.admin.controller;
-import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLogGlue;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogGlueDao;
+import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java
index dd0e6c3f..a5c32fed 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java
@@ -1,10 +1,10 @@
package com.xxl.job.admin.controller;
-import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
+import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.registry.RegistHelper;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java
index aa14586e..586636d2 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java
@@ -1,9 +1,9 @@
package com.xxl.job.admin.controller;
-import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.service.IXxlJobService;
+import com.xxl.job.core.biz.model.ReturnT;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -49,8 +49,8 @@ public class JobInfoController {
@RequestMapping("/add")
@ResponseBody
public ReturnT add(int jobGroup, String jobCron, String jobDesc, String author, String alarmEmail,
- String executorAppname, String executorAddress, String executorHandler, String executorParam,
- int glueSwitch, String glueSource, String glueRemark, String childJobKey) {
+ String executorAppname, String executorAddress, String executorHandler, String executorParam,
+ int glueSwitch, String glueSource, String glueRemark, String childJobKey) {
return xxlJobService.add(jobGroup, jobCron, jobDesc, author, alarmEmail,
executorAddress, executorHandler, executorParam,
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java
index 127c36ed..7785655e 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobLogController.java
@@ -1,16 +1,14 @@
package com.xxl.job.admin.controller;
-import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao;
-import com.xxl.job.core.router.HandlerRouter.ActionRepository;
-import com.xxl.job.core.router.model.RequestModel;
-import com.xxl.job.core.router.model.ResponseModel;
-import com.xxl.job.core.util.XxlJobNetCommUtil;
+import com.xxl.job.core.biz.ExecutorBiz;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.stereotype.Controller;
@@ -99,22 +97,24 @@ public class JobLogController {
if (log == null) {
return new ReturnT(500, "查看执行日志失败: 参数异常");
}
- if (!(ResponseModel.SUCCESS.equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) {
+ if (!((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) {
return new ReturnT(500, "查看执行日志失败: 任务发起调度失败,无法查看执行日志");
}
// trigger id, trigger time
- RequestModel requestModel = new RequestModel();
- requestModel.setTimestamp(System.currentTimeMillis());
- requestModel.setAction(ActionRepository.LOG.name());
- requestModel.setLogId(id);
- requestModel.setLogDateTim(log.getTriggerTime().getTime());
+ ExecutorBiz executorBiz = null;
+ try {
+ executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return new ReturnT(500, e.getMessage());
+ }
+ ReturnT logResult = executorBiz.log(log.getTriggerTime().getTime(), id);
- ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel);
- if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) {
- return new ReturnT(responseModel.getMsg());
+ if (ReturnT.SUCCESS_CODE == logResult.getCode()) {
+ return new ReturnT(logResult.getMsg());
} else {
- return new ReturnT(500, "查看执行日志失败: " + responseModel.getMsg());
+ return new ReturnT(500, "查看执行日志失败: " + logResult.getMsg());
}
}
@@ -134,26 +134,28 @@ public class JobLogController {
if (log == null || jobInfo==null) {
return new ReturnT(500, "参数异常");
}
- if (!ResponseModel.SUCCESS.equals(log.getTriggerStatus())) {
+ if (!(ReturnT.SUCCESS_CODE +"").equals(log.getTriggerStatus())) {
return new ReturnT(500, "调度失败,无法终止日志");
}
-
+
// request of kill
- RequestModel requestModel = new RequestModel();
- requestModel.setTimestamp(System.currentTimeMillis());
- requestModel.setAction(ActionRepository.KILL.name());
- requestModel.setJobGroup(String.valueOf(log.getJobGroup()));
- requestModel.setJobName(log.getJobName());
+ ExecutorBiz executorBiz = null;
+ try {
+ executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, log.getExecutorAddress()).getObject();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return new ReturnT(500, e.getMessage());
+ }
+ ReturnT runResult = executorBiz.kill(String.valueOf(log.getJobGroup()), log.getJobName());
- ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(log.getExecutorAddress()), requestModel);
- if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) {
- log.setHandleStatus(ResponseModel.FAIL);
+ if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
+ log.setHandleStatus(ReturnT.SUCCESS_CODE+"");
log.setHandleMsg("人为操作主动终止");
log.setHandleTime(new Date());
xxlJobLogDao.updateHandleInfo(log);
- return new ReturnT(responseModel.getMsg());
+ return new ReturnT(runResult.getMsg());
} else {
- return new ReturnT(500, responseModel.getMsg());
+ return new ReturnT(500, runResult.getMsg());
}
}
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java
index b0e40036..5a15245f 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java
@@ -1,7 +1,7 @@
package com.xxl.job.admin.controller.resolver;
-import com.xxl.job.admin.core.model.ReturnT;
-import com.xxl.job.core.util.JacksonUtil;
+import com.xxl.job.admin.core.util.JacksonUtil;
+import com.xxl.job.core.biz.model.ReturnT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ResponseBody;
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java
new file mode 100644
index 00000000..f146537a
--- /dev/null
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java
@@ -0,0 +1,84 @@
+package com.xxl.job.admin.core.biz;
+
+import com.xxl.job.admin.core.model.XxlJobInfo;
+import com.xxl.job.admin.core.model.XxlJobLog;
+import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
+import com.xxl.job.core.biz.AdminBiz;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.biz.model.TriggerParam;
+import org.apache.commons.lang.StringUtils;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Date;
+
+/**
+ * Created by xuxueli on 17/3/1.
+ */
+public class AdminBizImpl implements AdminBiz {
+ private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class);
+
+ @Override
+ public ReturnT callback(TriggerParam triggerParam) {
+
+ // valid log item
+ XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(triggerParam.getLogId());
+ if (log == null) {
+ return new ReturnT(ReturnT.FAIL_CODE, "log item not found.");
+ }
+
+ // trigger success, to trigger child job, and avoid repeat trigger child job
+ String childTriggerMsg = null;
+ if ((ReturnT.SUCCESS_CODE+"").equals(triggerParam.getStatus()) && !(ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) {
+ XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
+ if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
+ childTriggerMsg = "
";
+ String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
+ for (int i = 0; i < childJobKeys.length; i++) {
+ String[] jobKeyArr = childJobKeys[i].split("_");
+ if (jobKeyArr!=null && jobKeyArr.length==2) {
+ XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(Integer.valueOf(jobKeyArr[0]), jobKeyArr[1]);
+ if (childJobInfo!=null) {
+ try {
+ boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), String.valueOf(childJobInfo.getJobGroup()));
+
+ // add msg
+ childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
+ (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
+ } catch (SchedulerException e) {
+ logger.error("", e);
+ }
+ } else {
+ childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
+ (i+1), childJobKeys.length, childJobKeys[i]);
+ }
+ } else {
+ childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
+ (i+1), childJobKeys.length, childJobKeys[i]);
+ }
+ }
+
+ }
+ }
+
+ // handle msg
+ StringBuffer handleMsg = new StringBuffer();
+ if (triggerParam.getMsg() != null) {
+ handleMsg.append("执行备注:").append(triggerParam.getMsg());
+ }
+ if (childTriggerMsg !=null) {
+ handleMsg.append("
子任务触发备注:").append(childTriggerMsg);
+ }
+
+ // success, save log
+ log.setHandleTime(new Date());
+ log.setHandleStatus(triggerParam.getStatus());
+ log.setHandleMsg(handleMsg.toString());
+ DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
+
+ return new ReturnT(ReturnT.SUCCESS_CODE, null);
+ }
+
+}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java
deleted file mode 100644
index 21c8a227..00000000
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.xxl.job.admin.core.callback;
-
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
-import org.eclipse.jetty.util.thread.ExecutorThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by xuxueli on 2016-5-22 11:15:42
- */
-public class XxlJobLogCallbackServer {
- private static final Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServer.class);
-
- private Server server = null;
- public void start(int callBackPort) throws Exception {
-
- final int port = Integer.valueOf(callBackPort);
- new Thread(new Runnable() {
- @Override
- public void run() {
- server = new Server();
- server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞
-
- // connector
- SelectChannelConnector connector = new SelectChannelConnector();
- connector.setPort(port);
- connector.setMaxIdleTime(30000);
- server.setConnectors(new Connector[] { connector });
-
- // handler
- HandlerCollection handlerc =new HandlerCollection();
- handlerc.setHandlers(new Handler[]{new XxlJobLogCallbackServerHandler()});
- server.setHandler(handlerc);
-
- try {
- server.start();
- logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer start success at port:{}.", port);
- server.join(); // block until server ready
- logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer join success at port:{}.", port);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- }
-
- public void destroy() {
- if (server!=null) {
- try {
- server.stop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
-}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java
deleted file mode 100644
index 0f8d9f2d..00000000
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServerHandler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.xxl.job.admin.core.callback;
-
-import com.xxl.job.admin.core.model.XxlJobInfo;
-import com.xxl.job.admin.core.model.XxlJobLog;
-import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
-import com.xxl.job.core.router.model.RequestModel;
-import com.xxl.job.core.router.model.ResponseModel;
-import com.xxl.job.core.util.XxlJobNetCommUtil;
-import org.apache.commons.lang.StringUtils;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.quartz.SchedulerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.Date;
-
-/**
- * Created by xuxueli on 2016-5-22 11:15:42
- */
-public class XxlJobLogCallbackServerHandler extends AbstractHandler {
- private static Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServerHandler.class);
-
- @Override
- public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
-
- httpServletRequest.setCharacterEncoding("UTF-8");
- httpServletResponse.setCharacterEncoding("UTF-8");
-
- // parse hex-json to request model
- String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX);
-
- // do biz
- ResponseModel responseModel = dobiz(requestHex);
-
- // format response model to hex-json
- String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel);
-
- // response
- httpServletResponse.setContentType("text/html;charset=utf-8");
- httpServletResponse.setStatus(HttpServletResponse.SC_OK);
- baseRequest.setHandled(true);
- httpServletResponse.getWriter().println(responseHex);
- }
-
- private ResponseModel dobiz(String requestHex){
-
- // valid hex
- if (requestHex==null || requestHex.trim().length()==0) {
- return new ResponseModel(ResponseModel.FAIL, "request hex is null.");
- }
-
- // valid request model
- RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class);
- if (requestModel==null) {
- return new ResponseModel(ResponseModel.FAIL, "request hex parse fail.");
- }
-
- // valid log item
- XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId());
- if (log == null) {
- return new ResponseModel(ResponseModel.FAIL, "log item not found.");
- }
-
- // trigger success, to trigger child job, and avoid repeat trigger child job
- String childTriggerMsg = null;
- if (ResponseModel.SUCCESS.equals(requestModel.getStatus()) && !ResponseModel.SUCCESS.equals(log.getHandleStatus())) {
- XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
- if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
- childTriggerMsg = "
";
- String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
- for (int i = 0; i < childJobKeys.length; i++) {
- String[] jobKeyArr = childJobKeys[i].split("_");
- if (jobKeyArr!=null && jobKeyArr.length==2) {
- XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(Integer.valueOf(jobKeyArr[0]), jobKeyArr[1]);
- if (childJobInfo!=null) {
- try {
- boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), String.valueOf(childJobInfo.getJobGroup()));
-
- // add msg
- childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
- (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
- } catch (SchedulerException e) {
- logger.error("", e);
- }
- } else {
- childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
- (i+1), childJobKeys.length, childJobKeys[i]);
- }
- } else {
- childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
- (i+1), childJobKeys.length, childJobKeys[i]);
- }
- }
-
- }
- }
-
- // handle msg
- StringBuffer handleMsg = new StringBuffer();
- if (requestModel.getMsg() != null) {
- handleMsg.append("执行备注:").append(requestModel.getMsg());
- }
- if (childTriggerMsg !=null) {
- handleMsg.append("
子任务触发备注:").append(childTriggerMsg);
- }
-
- // success, save log
- log.setHandleTime(new Date());
- log.setHandleStatus(requestModel.getStatus());
- log.setHandleMsg(handleMsg.toString());
- DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
-
- return new ResponseModel(ResponseModel.SUCCESS, null);
- }
-
-}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java
index 5e785d5e..4c29301d 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java
@@ -5,12 +5,12 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
-import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
+import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
+import com.xxl.job.core.biz.ExecutorBiz;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.registry.RegistHelper;
-import com.xxl.job.core.router.HandlerRouter.ActionRepository;
-import com.xxl.job.core.router.model.RequestModel;
-import com.xxl.job.core.router.model.ResponseModel;
-import com.xxl.job.core.util.XxlJobNetCommUtil;
+import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -56,17 +56,15 @@ public class RemoteHttpJobBean extends QuartzJobBean {
jobLog.setTriggerTime(new Date());
// trigger request
- RequestModel requestModel = new RequestModel();
- requestModel.setTimestamp(System.currentTimeMillis());
- requestModel.setAction(ActionRepository.RUN.name());
- requestModel.setJobGroup(String.valueOf(jobInfo.getJobGroup()));
- requestModel.setJobName(jobInfo.getJobName());
- requestModel.setExecutorHandler(jobInfo.getExecutorHandler());
- requestModel.setExecutorParams(jobInfo.getExecutorParam());
- requestModel.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
- requestModel.setLogAddress(adminAddressSet);
- requestModel.setLogId(jobLog.getId());
- requestModel.setLogDateTim(jobLog.getTriggerTime().getTime());
+ TriggerParam triggerParam = new TriggerParam();
+ triggerParam.setJobGroup(String.valueOf(jobInfo.getJobGroup()));
+ triggerParam.setJobName(jobInfo.getJobName());
+ triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
+ triggerParam.setExecutorParams(jobInfo.getExecutorParam());
+ triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
+ triggerParam.setLogAddress(adminAddressSet);
+ triggerParam.setLogId(jobLog.getId());
+ triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
// parse address
List addressList = new ArrayList();
@@ -76,13 +74,13 @@ public class RemoteHttpJobBean extends QuartzJobBean {
}
// failover trigger
- ResponseModel responseModel = failoverTrigger(addressList, requestModel, jobLog);
+ ReturnT responseModel = failoverTrigger(addressList, triggerParam, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString());
// update trigger info 2/2
- jobLog.setTriggerStatus(responseModel.getStatus());
+ jobLog.setTriggerStatus(responseModel.getCode()+"");
jobLog.setTriggerMsg(responseModel.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog);
@@ -97,21 +95,28 @@ public class RemoteHttpJobBean extends QuartzJobBean {
* failover for trigger remote address
* @return
*/
- public ResponseModel failoverTrigger(List addressList, RequestModel requestModel, XxlJobLog jobLog){
+ public ReturnT failoverTrigger(List addressList, TriggerParam triggerParam, XxlJobLog jobLog){
if (addressList==null || addressList.size() < 1) {
- ResponseModel result = new ResponseModel();
- result.setStatus(ResponseModel.FAIL);
- result.setMsg( "Trigger error,
>>>[address] is null
" );
- return result;
+ return new ReturnT(ReturnT.FAIL_CODE, "Trigger error,
>>>[address] is null
");
} else if (addressList.size() == 1) {
String address = addressList.get(0);
// store real address
jobLog.setExecutorAddress(address);
- ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel);
- String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}
", address, triggerCallback.getStatus(), triggerCallback.getMsg());
- triggerCallback.setMsg(failoverMessage);
- return triggerCallback;
+ // real trigger
+ ExecutorBiz executorBiz = null;
+ try {
+ executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
+ }
+ ReturnT runResult = executorBiz.run(triggerParam);
+
+ String failoverMessage = MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[code] : {1},
>>>[msg] : {2}
",
+ address, runResult.getCode(), runResult.getMsg());
+ runResult.setMsg(runResult.getMsg() + failoverMessage);
+ return runResult;
} else {
// for ha
@@ -122,32 +127,38 @@ public class RemoteHttpJobBean extends QuartzJobBean {
for (String address : addressList) {
if (StringUtils.isNotBlank(address)) {
- // beat check
- RequestModel beatRequest = new RequestModel();
- beatRequest.setTimestamp(System.currentTimeMillis());
- beatRequest.setAction(ActionRepository.BEAT.name());
- ResponseModel beatResult = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), beatRequest);
- failoverMessage += MessageFormat.format("BEAT running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}
", address, beatResult.getStatus(), beatResult.getMsg());
+
+ ExecutorBiz executorBiz = null;
+ try {
+ executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
+ }
+
+ // beat check
+ ReturnT beatResult = executorBiz.beat();
+ failoverMessage += MessageFormat.format("BEAT running,
>>>[address] : {0},
>>>[code] : {1},
>>>[msg] : {2}
",
+ address, beatResult.getCode(), beatResult.getMsg());
// beat success, trigger do
- if (beatResult.SUCCESS.equals(beatResult.getStatus())) {
+ if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
// store real address
jobLog.setExecutorAddress(address);
// real trigger
- ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel);
- failoverMessage += MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}
", address, triggerCallback.getStatus(), triggerCallback.getMsg());
- triggerCallback.setMsg(failoverMessage);
- return triggerCallback;
+ ReturnT runResult = executorBiz.run(triggerParam);
+
+ failoverMessage += MessageFormat.format("Trigger running,
>>>[address] : {0},
>>>[status] : {1},
>>>[msg] : {2}
",
+ address, runResult.getCode(), runResult.getMsg());
+ runResult.setMsg( runResult.getMsg() + failoverMessage);
+ return runResult;
}
}
}
- ResponseModel result = new ResponseModel();
- result.setStatus(ResponseModel.FAIL);
- result.setMsg(failoverMessage);
- return result;
+ return new ReturnT(ReturnT.FAIL_CODE, failoverMessage);
}
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/DynamicSchedulerUtil.java
similarity index 93%
rename from xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java
rename to xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/DynamicSchedulerUtil.java
index 3e865b8c..f1a3a1d4 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/DynamicSchedulerUtil.java
@@ -1,301 +1,297 @@
-package com.xxl.job.admin.core.util;
-
-import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
-import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
-import com.xxl.job.admin.core.model.XxlJobInfo;
-import com.xxl.job.admin.core.thread.JobRegistryHelper;
-import com.xxl.job.admin.dao.IXxlJobGroupDao;
-import com.xxl.job.admin.dao.IXxlJobInfoDao;
-import com.xxl.job.admin.dao.IXxlJobLogDao;
-import com.xxl.job.admin.dao.IXxlJobRegistryDao;
-import com.xxl.job.core.util.IpUtil;
-import org.quartz.*;
-import org.quartz.Trigger.TriggerState;
-import org.quartz.impl.matchers.GroupMatcher;
-import org.quartz.impl.triggers.CronTriggerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.util.Assert;
-
-import java.util.*;
-
-/**
- * base quartz scheduler util
- * @author xuxueli 2015-12-19 16:13:53
- */
-public final class DynamicSchedulerUtil implements ApplicationContextAware, InitializingBean {
- private static final Logger logger = LoggerFactory.getLogger(DynamicSchedulerUtil.class);
-
- // Scheduler
- private static Scheduler scheduler;
- public static void setScheduler(Scheduler scheduler) {
- DynamicSchedulerUtil.scheduler = scheduler;
- }
-
- // trigger callback address
- private String callBackIp;
- private int callBackPort = 8888;
- private static String callbackAddress;
-
- public void setCallBackIp(String callBackIp) {
- this.callBackIp = callBackIp;
- }
- public void setCallBackPort(int callBackPort) {
- this.callBackPort = callBackPort;
- }
- public static String getCallbackAddress(){
- return callbackAddress;
- }
-
- // init
- XxlJobLogCallbackServer xxlJobLogCallbackServer = null;
- public void init(){
- try {
- // start callback server
- xxlJobLogCallbackServer = new XxlJobLogCallbackServer();
- xxlJobLogCallbackServer.start(callBackPort);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- // init callbackAddress
- if (callBackIp!=null && callBackIp.trim().length()>0) {
- callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort));
- } else {
- callbackAddress = IpUtil.getIpPort(callBackPort);;
- }
-
- // init JobRegistryHelper
- JobRegistryHelper.discover("g", "k");
- }
-
- // destroy
- public void destroy(){
- if (xxlJobLogCallbackServer!=null) {
- xxlJobLogCallbackServer.destroy();
- }
- }
-
- // xxlJobLogDao、xxlJobInfoDao
- public static IXxlJobLogDao xxlJobLogDao;
- public static IXxlJobInfoDao xxlJobInfoDao;
- public static IXxlJobRegistryDao xxlJobRegistryDao;
- public static IXxlJobGroupDao xxlJobGroupDao;
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class);
- DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class);
- DynamicSchedulerUtil.xxlJobRegistryDao = applicationContext.getBean(IXxlJobRegistryDao.class);
- DynamicSchedulerUtil.xxlJobGroupDao = applicationContext.getBean(IXxlJobGroupDao.class);
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- Assert.notNull(scheduler, "quartz scheduler is null");
- logger.info(">>>>>>>>> init quartz scheduler success.[{}]", scheduler);
-
- }
-
- // getJobKeys
- @Deprecated
- public static List