调度中心API服务,Client端调用逻辑优化

v1.8.2
xuxueli 7 years ago
parent 8e07d501e2
commit f83346de2d

@ -1,9 +1,11 @@
package com.xxl.job.core.executor; package com.xxl.job.core.executor;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander; import com.xxl.job.core.handler.annotation.JobHander;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.rpc.netcom.NetComServerFactory; import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.JobThread;
@ -17,6 +19,8 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextClosedEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -29,7 +33,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
private String ip; private String ip;
private int port = 9999; private int port = 9999;
private String appName; private String appName;
public static String adminAddresses; private String adminAddresses;
public static String logPath; public static String logPath;
public void setIp(String ip) { public void setIp(String ip) {
@ -48,9 +52,32 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
this.logPath = logPath; this.logPath = logPath;
} }
// ---------------------------------- admin-client ------------------------------------
private static List<AdminBiz> adminBizList;
private static void initAdminBizList(String adminAddresses) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat("/api");
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl).getObject();
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
// ---------------------------------- job server ------------------------------------ // ---------------------------------- job server ------------------------------------
private NetComServerFactory serverFactory = new NetComServerFactory(); private NetComServerFactory serverFactory = new NetComServerFactory();
public void start() throws Exception { public void start() throws Exception {
// init admin-client
initAdminBizList(adminAddresses);
// executor start // executor start
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty
serverFactory.start(port, ip, appName); serverFactory.start(port, ip, appName);

@ -5,7 +5,6 @@ import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.util.IpUtil; import com.xxl.job.core.util.IpUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -32,7 +31,7 @@ public class ExecutorRegistryThread extends Thread {
logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, appName is null."); logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
return; return;
} }
if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) { if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return; return;
} }
@ -49,15 +48,10 @@ public class ExecutorRegistryThread extends Thread {
@Override @Override
public void run() { public void run() {
while (!toStop) { while (!toStop) {
try { try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress); RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) {
String apiUrl = addressUrl.concat("/api");
try { try {
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject();
ReturnT<String> registryResult = adminBiz.registry(registryParam); ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS; registryResult = ReturnT.SUCCESS;
@ -71,7 +65,6 @@ public class ExecutorRegistryThread extends Thread {
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }

@ -4,7 +4,6 @@ import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,17 +42,14 @@ public class TriggerCallbackThread {
callbackParamList.add(callback); callbackParamList.add(callback);
// valid // valid
if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) { if (XxlJobExecutor.getAdminBizList()==null) {
logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null, callbackParamList{}", callbackParamList); logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null, callbackParamList{}", callbackParamList);
continue; continue;
} }
// callback, will retry if error // callback, will retry if error
for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
String apiUrl = addressUrl.concat("/api");
try { try {
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject();
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackResult = ReturnT.SUCCESS; callbackResult = ReturnT.SUCCESS;

Loading…
Cancel
Save