From ee07e0b794787b574809856843af706690d3124e Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Thu, 27 Jul 2017 22:56:00 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E4=B8=AD=E5=BF=83API?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=94=B9=E4=B8=BA=E8=87=AA=E7=A0=94RPC?= =?UTF-8?q?=E5=BD=A2=E5=BC=8F=EF=BC=8C=E7=BB=9F=E4=B8=80=E5=BA=95=E5=B1=82?= =?UTF-8?q?=E9=80=9A=E8=AE=AF=E6=A8=A1=E5=9E=8B=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 1 + .../admin/controller/JobApiController.java | 146 ++++++------------ .../core/schedule/XxlJobDynamicScheduler.java | 3 + .../job/admin/service/impl/AdminBizImpl.java | 121 +++++++++++++++ .../com/xxl/job/dao/impl/AdminApiTest.java | 25 --- .../java/com/xxl/job/core/biz/AdminBiz.java | 30 ++++ .../xxl/job/core/executor/XxlJobExecutor.java | 6 +- .../rpc/netcom/jetty/client/JettyClient.java | 8 +- .../core/thread/ExecutorRegistryThread.java | 26 +++- .../core/thread/TriggerCallbackThread.java | 26 +++- .../com/xxl/job/core/util/AdminApiUtil.java | 127 --------------- .../java/com/xxl/job/core/util/DBUtil.java | 128 --------------- 12 files changed, 252 insertions(+), 395 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java delete mode 100644 xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/DBUtil.java diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index cc8c77e2..486a4c54 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -902,6 +902,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 3、执行器JobHandler禁止命名冲突; - 4、执行器集群地址列表进行自然排序; - 5、调度中心,DAO层代码精简优化并且新增测试用例覆盖; +- 6、调度中心API服务改为自研RPC形式,统一底层通讯模型; #### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java index f50fb9af..8a36c6cf 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java @@ -1,30 +1,22 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.controller.annotation.PermessionLimit; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.admin.dao.XxlJobInfoDao; -import com.xxl.job.admin.dao.XxlJobLogDao; -import com.xxl.job.admin.dao.XxlJobRegistryDao; -import com.xxl.job.core.biz.model.HandleCallbackParam; -import com.xxl.job.core.biz.model.RegistryParam; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.util.AdminApiUtil; -import org.apache.commons.lang.StringUtils; -import org.quartz.SchedulerException; +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.rpc.codec.RpcRequest; +import com.xxl.job.core.rpc.codec.RpcResponse; +import com.xxl.job.core.rpc.netcom.NetComServerFactory; +import com.xxl.job.core.rpc.serialize.HessianSerializer; +import com.xxl.job.core.util.HttpClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.ResponseBody; -import javax.annotation.Resource; -import java.text.MessageFormat; -import java.util.Date; -import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.OutputStream; /** * Created by xuxueli on 17/5/10. @@ -33,100 +25,50 @@ import java.util.List; public class JobApiController { private static Logger logger = LoggerFactory.getLogger(JobApiController.class); - @Resource - public XxlJobLogDao xxlJobLogDao; - @Resource - private XxlJobInfoDao xxlJobInfoDao; - @Resource - private XxlJobRegistryDao xxlJobRegistryDao; - - - @RequestMapping(value= AdminApiUtil.CALLBACK, method = RequestMethod.POST, consumes = "application/json") - @ResponseBody - @PermessionLimit(limit=false) - public ReturnT callback(@RequestBody List callbackParamList){ - - for (HandleCallbackParam handleCallbackParam: callbackParamList) { - ReturnT callbackResult = callback(handleCallbackParam); - logger.info("JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", - (callbackResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult); - } - - return ReturnT.SUCCESS; + static { + NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz); } - private ReturnT callback(HandleCallbackParam handleCallbackParam) { - // valid log item - XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId()); - if (log == null) { - return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); - } - - // trigger success, to trigger child job, and avoid repeat trigger child job - String childTriggerMsg = null; - if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) { - XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId()); - if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { - childTriggerMsg = "
"; - String[] childJobKeys = xxlJobInfo.getChildJobKey().split(","); - for (int i = 0; i < childJobKeys.length; i++) { - String[] jobKeyArr = childJobKeys[i].split("_"); - if (jobKeyArr!=null && jobKeyArr.length==2) { - XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1])); - if (childJobInfo!=null) { - try { - boolean ret = XxlJobDynamicScheduler.triggerJob(String.valueOf(childJobInfo.getId()), String.valueOf(childJobInfo.getJobGroup())); + private RpcResponse doInvoke(HttpServletRequest request) { + try { + // deserialize request + byte[] requestBytes = HttpClientUtil.readBytes(request); + if (requestBytes == null || requestBytes.length==0) { + RpcResponse rpcResponse = new RpcResponse(); + rpcResponse.setError("RpcRequest byte[] is null"); + return rpcResponse; + } + RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class); - // add msg - childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}", - (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc()); - } catch (SchedulerException e) { - logger.error("", e); - } - } else { - childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}", - (i+1), childJobKeys.length, childJobKeys[i]); - } - } else { - childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}", - (i+1), childJobKeys.length, childJobKeys[i]); - } - } + // invoke + RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null); + return rpcResponse; + } catch (Exception e) { + logger.error(e.getMessage(), e); - } + RpcResponse rpcResponse = new RpcResponse(); + rpcResponse.setError("Server-error:" + e.getMessage()); + return rpcResponse; } + } - // handle msg - StringBuffer handleMsg = new StringBuffer(); - if (log.getHandleMsg()!=null) { - handleMsg.append(log.getHandleMsg()).append("
"); - } - if (handleCallbackParam.getExecuteResult().getMsg() != null) { - handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); - } - if (childTriggerMsg !=null) { - handleMsg.append("
子任务触发备注:").append(childTriggerMsg); - } + @RequestMapping("/api") + @PermessionLimit(limit=false) + public void api(HttpServletRequest request, HttpServletResponse response) throws IOException { - // success, save log - log.setHandleTime(new Date()); - log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); - log.setHandleMsg(handleMsg.toString()); - xxlJobLogDao.updateHandleInfo(log); + // invoke + RpcResponse rpcResponse = doInvoke(request); - return ReturnT.SUCCESS; - } + // serialize response + byte[] responseBytes = HessianSerializer.serialize(rpcResponse); + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + //baseRequest.setHandled(true); - @RequestMapping(value=AdminApiUtil.REGISTRY, method = RequestMethod.POST, consumes = "application/json") - @ResponseBody - @PermessionLimit(limit=false) - public ReturnT registry(@RequestBody RegistryParam registryParam){ - int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); - if (ret < 1) { - xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); - } - return ReturnT.SUCCESS; + OutputStream out = response.getOutputStream(); + out.write(responseBytes); + out.flush(); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java index 0e9dbac5..3fc57fd0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java @@ -8,6 +8,7 @@ import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; import com.xxl.job.admin.dao.XxlJobLogDao; import com.xxl.job.admin.dao.XxlJobRegistryDao; +import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.rpc.netcom.NetComServerFactory; import org.quartz.*; import org.quartz.Trigger.TriggerState; @@ -62,6 +63,7 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In public static XxlJobInfoDao xxlJobInfoDao; public static XxlJobRegistryDao xxlJobRegistryDao; public static XxlJobGroupDao xxlJobGroupDao; + public static AdminBiz adminBiz; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { @@ -69,6 +71,7 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In XxlJobDynamicScheduler.xxlJobInfoDao = applicationContext.getBean(XxlJobInfoDao.class); XxlJobDynamicScheduler.xxlJobRegistryDao = applicationContext.getBean(XxlJobRegistryDao.class); XxlJobDynamicScheduler.xxlJobGroupDao = applicationContext.getBean(XxlJobGroupDao.class); + XxlJobDynamicScheduler.adminBiz = applicationContext.getBean(AdminBiz.class); } @Override diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java new file mode 100644 index 00000000..4c446a31 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -0,0 +1,121 @@ +package com.xxl.job.admin.service.impl; + +import com.xxl.job.admin.controller.JobApiController; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; +import com.xxl.job.admin.dao.XxlJobInfoDao; +import com.xxl.job.admin.dao.XxlJobLogDao; +import com.xxl.job.admin.dao.XxlJobRegistryDao; +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import org.apache.commons.lang.StringUtils; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.text.MessageFormat; +import java.util.Date; +import java.util.List; + +/** + * @author xuxueli 2017-07-27 21:54:20 + */ +@Service +public class AdminBizImpl implements AdminBiz { + private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class); + + @Resource + public XxlJobLogDao xxlJobLogDao; + @Resource + private XxlJobInfoDao xxlJobInfoDao; + @Resource + private XxlJobRegistryDao xxlJobRegistryDao; + + @Override + public ReturnT callback(List callbackParamList) { + for (HandleCallbackParam handleCallbackParam: callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.info("JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult); + } + + return ReturnT.SUCCESS; + } + + private ReturnT callback(HandleCallbackParam handleCallbackParam) { + // valid log item + XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + + // trigger success, to trigger child job, and avoid repeat trigger child job + String childTriggerMsg = null; + if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) { + XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId()); + if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { + childTriggerMsg = "
"; + String[] childJobKeys = xxlJobInfo.getChildJobKey().split(","); + for (int i = 0; i < childJobKeys.length; i++) { + String[] jobKeyArr = childJobKeys[i].split("_"); + if (jobKeyArr!=null && jobKeyArr.length==2) { + XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1])); + if (childJobInfo!=null) { + try { + boolean ret = XxlJobDynamicScheduler.triggerJob(String.valueOf(childJobInfo.getId()), String.valueOf(childJobInfo.getJobGroup())); + + // add msg + childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}", + (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc()); + } catch (SchedulerException e) { + logger.error("", e); + } + } else { + childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}", + (i+1), childJobKeys.length, childJobKeys[i]); + } + } else { + childTriggerMsg += MessageFormat.format("
{0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}", + (i+1), childJobKeys.length, childJobKeys[i]); + } + } + + } + } + + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (log.getHandleMsg()!=null) { + handleMsg.append(log.getHandleMsg()).append("
"); + } + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); + } + if (childTriggerMsg !=null) { + handleMsg.append("
子任务触发备注:").append(childTriggerMsg); + } + + // success, save log + log.setHandleTime(new Date()); + log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); + log.setHandleMsg(handleMsg.toString()); + xxlJobLogDao.updateHandleInfo(log); + + return ReturnT.SUCCESS; + } + + @Override + public ReturnT registry(RegistryParam registryParam) { + int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); + if (ret < 1) { + xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); + } + return ReturnT.SUCCESS; + } + +} diff --git a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java deleted file mode 100644 index 2a29a9d5..00000000 --- a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.xxl.job.dao.impl; - -import com.xxl.job.core.biz.model.RegistryParam; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.enums.RegistryConfig; -import com.xxl.job.core.util.AdminApiUtil; - -/** - * Created by xuxueli on 17/5/10. - */ -public class AdminApiTest { - - public static void main(String[] args) { - try { - RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "aaa", "112312312312"); - - AdminApiUtil.init("http://localhost:8080/xxl-job-admin"); - ReturnT registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam); - System.out.println(registryResult); - } catch (Exception e) { - e.printStackTrace(); - } - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java new file mode 100644 index 00000000..1e4b8313 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java @@ -0,0 +1,30 @@ +package com.xxl.job.core.biz; + +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; + +import java.util.List; + +/** + * @author xuxueli 2017-07-27 21:52:49 + */ +public interface AdminBiz { + + /** + * callback + * + * @param callbackParamList + * @return + */ + public ReturnT callback(List callbackParamList); + + /** + * registry + * + * @param registryParam + * @return + */ + public ReturnT registry(RegistryParam registryParam); + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index f399ed67..e7a1de37 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -8,7 +8,6 @@ import com.xxl.job.core.rpc.netcom.NetComServerFactory; import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.TriggerCallbackThread; -import com.xxl.job.core.util.AdminApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -30,7 +29,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe private String ip; private int port = 9999; private String appName; - private String adminAddresses; + public static String adminAddresses; public static String logPath; public void setIp(String ip) { @@ -52,9 +51,6 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe // ---------------------------------- job server ------------------------------------ private NetComServerFactory serverFactory = new NetComServerFactory(); public void start() throws Exception { - // admin api util init - AdminApiUtil.init(adminAddresses); - // executor start NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); serverFactory.start(port, ip, appName); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java index a61c53d2..27e25fb0 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java @@ -19,8 +19,14 @@ public class JettyClient { // serialize request byte[] requestBytes = HessianSerializer.serialize(request); + // reqURL + String reqURL = request.getServerAddress(); + if (reqURL!=null && reqURL.indexOf("http://")==-1) { + reqURL = "http://" + request.getServerAddress() + "/"; + } + // remote invoke - byte[] responseBytes = HttpClientUtil.postRequest("http://" + request.getServerAddress() + "/", requestBytes); + byte[] responseBytes = HttpClientUtil.postRequest(reqURL, requestBytes); if (responseBytes == null || responseBytes.length==0) { RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setError("RpcResponse byte[] is null"); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index c567b9bb..5e618606 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -1,9 +1,11 @@ package com.xxl.job.core.thread; +import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; -import com.xxl.job.core.util.AdminApiUtil; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.rpc.netcom.NetComClientProxy; import com.xxl.job.core.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +28,12 @@ public class ExecutorRegistryThread extends Thread { public void start(final int port, final String ip, final String appName){ // valid - if ( !(AdminApiUtil.allowCallApi() && (appName!=null && appName.trim().length()>0)) ) { - logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail"); + if (appName==null || appName.trim().length()==0) { + logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, appName is null."); + return; + } + if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) { + logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } @@ -45,7 +51,19 @@ public class ExecutorRegistryThread extends Thread { while (!toStop) { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress); - ReturnT registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam); + ReturnT registryResult = null; + + for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { + String apiUrl = addressUrl.concat("/api"); + + AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject(); + registryResult = adminBiz.registry(registryParam); + if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { + registryResult = ReturnT.SUCCESS; + break; + } + } + logger.info(">>>>>>>>>>> xxl-job Executor registry {}, RegistryParam:{}, registryResult:{}", new Object[]{(registryResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), registryParam.toString(), registryResult.toString()}); } catch (Exception e) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 2f4b499d..2d732477 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -1,8 +1,10 @@ package com.xxl.job.core.thread; +import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.util.AdminApiUtil; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.rpc.netcom.NetComClientProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,14 +37,32 @@ public class TriggerCallbackThread { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { - // callback list + // valid + if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) { + logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null."); + continue; + } + + // callback list param List callbackParamList = new ArrayList(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error try { - ReturnT callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callbackParamList); + + ReturnT callbackResult = null; + for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { + String apiUrl = addressUrl.concat("/api"); + + AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject(); + callbackResult = adminBiz.callback(callbackParamList); + if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + callbackResult = ReturnT.SUCCESS; + break; + } + } + logger.info(">>>>>>>>>>> xxl-job callback, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult}); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java deleted file mode 100644 index 5359b4a3..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.xxl.job.core.util; - -import com.xxl.job.core.biz.model.ReturnT; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * @author xuxueli 2017-05-10 21:28:15 - */ -public class AdminApiUtil { - private static Logger logger = LoggerFactory.getLogger(AdminApiUtil.class); - - public static final String CALLBACK = "/api/callback"; - public static final String REGISTRY = "/api/registry"; - - private static List adminAddressList = null; - public static void init(String adminAddresses){ - // admin assress list - if (adminAddresses != null) { - Set adminAddressSet = new HashSet(); - for (String adminAddressItem: adminAddresses.split(",")) { - if (adminAddressItem.trim().length()>0) { - adminAddressSet.add(adminAddressItem); - } - } - adminAddressList = new ArrayList(adminAddressSet); - } - } - public static boolean allowCallApi(){ - boolean allowCallApi = (adminAddressList!=null && adminAddressList.size()>0); - return allowCallApi; - } - - public static ReturnT callApiFailover(String subUrl, Object requestObj) throws Exception { - - if (!allowCallApi()) { - return new ReturnT(ReturnT.FAIL_CODE, "allowCallApi fail."); - } - - for (String adminAddress: adminAddressList) { - ReturnT registryResult = null; - try { - String apiUrl = adminAddress.concat(subUrl); - registryResult = callApi(apiUrl, requestObj); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - if (registryResult!=null && registryResult.getCode()==ReturnT.SUCCESS_CODE) { - return ReturnT.SUCCESS; - } - } - return ReturnT.FAIL; - } - - private static ReturnT callApi(String finalUrl, Object requestObj) throws Exception { - HttpPost httpPost = new HttpPost(finalUrl); - CloseableHttpClient httpClient = HttpClients.createDefault(); - try { - - // timeout - RequestConfig requestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(10000) - .setSocketTimeout(10000) - .setConnectTimeout(10000) - .build(); - - httpPost.setConfig(requestConfig); - - // data - if (requestObj != null) { - String json = JacksonUtil.writeValueAsString(requestObj); - - StringEntity entity = new StringEntity(json, "utf-8"); - entity.setContentEncoding("UTF-8"); - entity.setContentType("application/json"); - - httpPost.setEntity(entity); - } - - // do post - HttpResponse response = httpClient.execute(httpPost); - HttpEntity entity = response.getEntity(); - if (null != entity) { - String responseMsg = EntityUtils.toString(entity, "UTF-8"); - if (response.getStatusLine().getStatusCode() != 200) { - EntityUtils.consume(entity); - return new ReturnT(response.getStatusLine().getStatusCode(), - "StatusCode(+"+ response.getStatusLine().getStatusCode() +") Error,response:" + responseMsg); - } - - EntityUtils.consume(entity); - if (responseMsg!=null && responseMsg.startsWith("{")) { - ReturnT result = JacksonUtil.readValue(responseMsg, ReturnT.class); - return result; - } - } - return ReturnT.FAIL; - } catch (Exception e) { - logger.error("", e); - return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); - } finally { - if (httpPost!=null) { - httpPost.releaseConnection(); - } - try { - httpClient.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/DBUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/DBUtil.java deleted file mode 100644 index 3ac3c7d6..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/DBUtil.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.xxl.job.core.util; - -import javax.sql.DataSource; -import java.sql.*; -import java.util.*; - -/** - * Created by xuxueli on 16/9/30. - */ -public class DBUtil { - - private static Connection getConn(DataSource dataSource) { - try { - return dataSource.getConnection(); - } catch (SQLException e) { - e.printStackTrace(); - } - return null; - } - - /** - * update - * - * @param dataSource - * @param sql - * @param params - */ - public static int update(DataSource dataSource, String sql, Object params[]) { - Connection connection = getConn(dataSource); - PreparedStatement preparedStatement = null; - int ret = 0; - try { - preparedStatement = connection.prepareStatement(sql); - if (params != null) { - for (int i = 0; i < params.length; i++) { - preparedStatement.setObject(i + 1, params[i]); - } - } - ret = preparedStatement.executeUpdate(); - } catch (SQLException e) { - e.printStackTrace(); - } finally { - release(connection, preparedStatement, null); - } - return ret; - } - - /** - * query - * - * @param dataSource - * @param sql - * @param params - * @return - */ - public static List> query(DataSource dataSource, String sql, Object[] params) { - Connection connection = getConn(dataSource); - PreparedStatement preparedStatement = null; - ResultSet resultSet = null; - try { - preparedStatement = connection.prepareStatement(sql); - if (params != null) { - for (int i = 0; i < params.length; i++) { - preparedStatement.setObject(i + 1, params[i]); - } - } - resultSet = preparedStatement.executeQuery(); - - List> ret = resultSetToList(resultSet); - return ret; - } catch (SQLException e) { - e.printStackTrace(); - } finally { - release(connection, preparedStatement, resultSet); - } - return null; - } - - private static List> resultSetToList(ResultSet resultSet) throws SQLException { - if (resultSet == null) { - return new ArrayList>(); - } - - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 得到结果集(rs)的结构信息,比如字段数、字段名等 - int columnCount = resultSetMetaData.getColumnCount(); // 返回此 ResultSet 对象中的列数 - - List> list = new ArrayList>(); - while (resultSet.next()) { - Map rowData = new HashMap(columnCount); - for (int i = 1; i <= columnCount; i++) { - rowData.put(resultSetMetaData.getColumnName(i), resultSet.getObject(i)); - } - list.add(rowData); - } - return list; - } - - /** - * release - * @param connection - * @param preparedStatement - * @param resultSet - */ - public static void release(Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) { - if (resultSet != null) { - try { - resultSet.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - if (preparedStatement != null) { - try { - preparedStatement.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - e.printStackTrace(); - } - } - } - -}