From f83346de2d1f3d616561946101bcd83f0b6a2d57 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Fri, 28 Jul 2017 22:01:29 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E4=B8=AD=E5=BF=83API?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=EF=BC=8CClient=E7=AB=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xxl/job/core/executor/XxlJobExecutor.java | 29 ++++++++++++++++++- .../core/thread/ExecutorRegistryThread.java | 11 ++----- .../core/thread/TriggerCallbackThread.java | 8 ++--- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 89c63de2..4f80b48b 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -1,9 +1,11 @@ package com.xxl.job.core.executor; +import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; +import com.xxl.job.core.rpc.netcom.NetComClientProxy; import com.xxl.job.core.rpc.netcom.NetComServerFactory; import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.JobThread; @@ -17,6 +19,8 @@ import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,7 +33,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe private String ip; private int port = 9999; private String appName; - public static String adminAddresses; + private String adminAddresses; public static String logPath; public void setIp(String ip) { @@ -48,9 +52,32 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe this.logPath = logPath; } + // ---------------------------------- admin-client ------------------------------------ + private static List adminBizList; + private static void initAdminBizList(String adminAddresses) throws Exception { + if (adminAddresses!=null && adminAddresses.trim().length()>0) { + for (String address: adminAddresses.trim().split(",")) { + if (address!=null && address.trim().length()>0) { + String addressUrl = address.concat("/api"); + AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl).getObject(); + if (adminBizList == null) { + adminBizList = new ArrayList(); + } + adminBizList.add(adminBiz); + } + } + } + } + public static List getAdminBizList(){ + return adminBizList; + } + // ---------------------------------- job server ------------------------------------ private NetComServerFactory serverFactory = new NetComServerFactory(); public void start() throws Exception { + // init admin-client + initAdminBizList(adminAddresses); + // executor start NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty serverFactory.start(port, ip, appName); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index 962504cc..c3e7eca2 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -5,7 +5,6 @@ import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.executor.XxlJobExecutor; -import com.xxl.job.core.rpc.netcom.NetComClientProxy; import com.xxl.job.core.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +31,7 @@ public class ExecutorRegistryThread extends Thread { logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, appName is null."); return; } - if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) { + if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } @@ -49,15 +48,10 @@ public class ExecutorRegistryThread extends Thread { @Override public void run() { while (!toStop) { - try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress); - - for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { - String apiUrl = addressUrl.concat("/api"); - + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { - AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject(); ReturnT registryResult = adminBiz.registry(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; @@ -71,7 +65,6 @@ public class ExecutorRegistryThread extends Thread { } } - } catch (Exception e) { logger.error(e.getMessage(), e); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 55ddcae6..8bb5463e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -4,7 +4,6 @@ import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.executor.XxlJobExecutor; -import com.xxl.job.core.rpc.netcom.NetComClientProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,17 +42,14 @@ public class TriggerCallbackThread { callbackParamList.add(callback); // valid - if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) { + if (XxlJobExecutor.getAdminBizList()==null) { logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null, callbackParamList:{}", callbackParamList); continue; } // callback, will retry if error - for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { - String apiUrl = addressUrl.concat("/api"); - + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { - AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject(); ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackResult = ReturnT.SUCCESS;