调度中心任务注册检测逻辑优化;

v1.8.2
xuxueli 7 years ago
parent 1c556b8917
commit 7b052d7331

@ -1,12 +1,9 @@
package com.xxl.job.admin.controller; package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.ui.Model; import org.springframework.ui.Model;
@ -14,7 +11,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
@ -36,20 +32,6 @@ public class JobGroupController {
// job group (executor) // job group (executor)
List<XxlJobGroup> list = xxlJobGroupDao.findAll(); List<XxlJobGroup> list = xxlJobGroupDao.findAll();
if (CollectionUtils.isNotEmpty(list)) {
for (XxlJobGroup group: list) {
List<String> registryList = null;
if (group.getAddressType() == 0) {
registryList = JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName());
} else {
if (StringUtils.isNotBlank(group.getAddressList())) {
registryList = Arrays.asList(group.getAddressList().split(","));
}
}
group.setRegistryList(registryList);
}
}
model.addAttribute("list", list); model.addAttribute("list", list);
return "jobgroup/jobgroup.index"; return "jobgroup/jobgroup.index";
} }

@ -1,5 +1,9 @@
package com.xxl.job.admin.core.model; package com.xxl.job.admin.core.model;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
@ -16,6 +20,12 @@ public class XxlJobGroup {
// registry list // registry list
private List<String> registryList; // 执行器地址列表(系统注册) private List<String> registryList; // 执行器地址列表(系统注册)
public List<String> getRegistryList() {
if (StringUtils.isNotBlank(addressList)) {
registryList = new ArrayList<String>(Arrays.asList(addressList.split(",")));
}
return registryList;
}
public int getId() { public int getId() {
return id; return id;
@ -49,14 +59,6 @@ public class XxlJobGroup {
this.order = order; this.order = order;
} }
public List<String> getRegistryList() {
return registryList;
}
public void setRegistryList(List<String> registryList) {
this.registryList = registryList;
}
public int getAddressType() { public int getAddressType() {
return addressType; return addressType;
} }
@ -72,4 +74,5 @@ public class XxlJobGroup {
public void setAddressList(String addressList) { public void setAddressList(String addressList) {
this.addressList = addressList; this.addressList = addressList;
} }
} }

@ -1,14 +1,17 @@
package com.xxl.job.admin.core.thread; package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.enums.RegistryConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -23,8 +26,6 @@ public class JobRegistryMonitorHelper {
return instance; return instance;
} }
private ConcurrentHashMap<String, List<String>> registMap = new ConcurrentHashMap<String, List<String>>();
private Thread registryThread; private Thread registryThread;
private boolean toStop = false; private boolean toStop = false;
public void start(){ public void start(){
@ -33,26 +34,42 @@ public class JobRegistryMonitorHelper {
public void run() { public void run() {
while (!toStop) { while (!toStop) {
try { try {
// remove dead admin/executor // auto registry group
XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT); List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
if (CollectionUtils.isNotEmpty(groupList)) {
// fresh registry map // remove dead address (admin/executor)
ConcurrentHashMap<String, List<String>> temp = new ConcurrentHashMap<String, List<String>>(); XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
if (list != null) { // fresh online address (admin/executor)
for (XxlJobRegistry item: list) { HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
String groupKey = makeGroupKey(item.getRegistryGroup(), item.getRegistryKey()); List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
List<String> registryList = temp.get(groupKey); if (list != null) {
if (registryList == null) { for (XxlJobRegistry item: list) {
registryList = new ArrayList<String>(); if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
} String appName = item.getRegistryKey();
if (!registryList.contains(item.getRegistryValue())) { List<String> registryList = appAddressMap.get(appName);
registryList.add(item.getRegistryValue()); if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appName, registryList);
}
} }
temp.put(groupKey, registryList); }
// fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppName());
String addressListStr = StringUtils.join(registryList, ",");
group.setAddressList(addressListStr);
XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
} }
} }
registMap = temp;
} catch (Exception e) { } catch (Exception e) {
logger.error("job registry instance error:{}", e); logger.error("job registry instance error:{}", e);
} }
@ -73,13 +90,4 @@ public class JobRegistryMonitorHelper {
//registryThread.interrupt(); //registryThread.interrupt();
} }
private static String makeGroupKey(String registryGroup, String registryKey){
return registryGroup.concat("_").concat(registryKey);
}
public static List<String> discover(String registryGroup, String registryKey){
String groupKey = makeGroupKey(registryGroup, registryKey);
return instance.registMap.get(groupKey);
}
} }

@ -7,18 +7,14 @@ import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper; import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.enums.RegistryConfig;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
/** /**
@ -90,17 +86,10 @@ public class XxlJobTrigger {
StringBuffer triggerSb = new StringBuffer(); StringBuffer triggerSb = new StringBuffer();
// exerutor address list // exerutor address list
ArrayList<String> addressList = null;
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
if (group.getAddressType() == 0) { triggerSb.append( (group.getAddressType() == 0)?"注册方式:自动注册":"注册方式:手动录入" );
triggerSb.append("注册方式:自动注册"); ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
addressList = (ArrayList<String>) JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName());
} else {
triggerSb.append("注册方式:手动录入");
if (StringUtils.isNotBlank(group.getAddressList())) {
addressList = new ArrayList<String>(Arrays.asList(group.getAddressList().split(",")));
}
}
triggerSb.append("<br>阻塞处理策略:").append(ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION).getTitle()); triggerSb.append("<br>阻塞处理策略:").append(ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION).getTitle());
triggerSb.append("<br>失败处理策略:").append(ExecutorFailStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM).getTitle()); triggerSb.append("<br>失败处理策略:").append(ExecutorFailStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM).getTitle());
triggerSb.append("<br>地址列表:").append(addressList!=null?addressList.toString():""); triggerSb.append("<br>地址列表:").append(addressList!=null?addressList.toString():"");

@ -11,6 +11,8 @@ public interface IXxlJobGroupDao {
public List<XxlJobGroup> findAll(); public List<XxlJobGroup> findAll();
public List<XxlJobGroup> findByAddressType(int addressType);
public int save(XxlJobGroup xxlJobGroup); public int save(XxlJobGroup xxlJobGroup);
public int update(XxlJobGroup xxlJobGroup); public int update(XxlJobGroup xxlJobGroup);

@ -22,6 +22,11 @@ public class XxlJobGroupDaoImpl implements IXxlJobGroupDao {
return sqlSessionTemplate.selectList("XxlJobGroupMapper.findAll"); return sqlSessionTemplate.selectList("XxlJobGroupMapper.findAll");
} }
@Override
public List<XxlJobGroup> findByAddressType(int addressType) {
return sqlSessionTemplate.selectList("XxlJobGroupMapper.findByAddressType", addressType);
}
@Override @Override
public int save(XxlJobGroup xxlJobGroup) { public int save(XxlJobGroup xxlJobGroup) {
return sqlSessionTemplate.update("XxlJobGroupMapper.save", xxlJobGroup); return sqlSessionTemplate.update("XxlJobGroupMapper.save", xxlJobGroup);

@ -5,7 +5,6 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobLogDao;
@ -13,7 +12,6 @@ import com.xxl.job.admin.dao.IXxlJobLogGlueDao;
import com.xxl.job.admin.service.IXxlJobService; import com.xxl.job.admin.service.IXxlJobService;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.glue.GlueTypeEnum; import com.xxl.job.core.glue.GlueTypeEnum;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -288,21 +286,15 @@ public class XxlJobServiceImpl implements IXxlJobService {
// executor count // executor count
Set<String> executerAddressSet = new HashSet<String>(); Set<String> executerAddressSet = new HashSet<String>();
List<XxlJobGroup> groupList = xxlJobGroupDao.findAll(); List<XxlJobGroup> groupList = xxlJobGroupDao.findAll();
if (CollectionUtils.isNotEmpty(groupList)) { if (CollectionUtils.isNotEmpty(groupList)) {
for (XxlJobGroup group: groupList) { for (XxlJobGroup group: groupList) {
List<String> registryList = null; if (CollectionUtils.isNotEmpty(group.getRegistryList())) {
if (group.getAddressType() == 0) { executerAddressSet.addAll(group.getRegistryList());
registryList = JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName());
} else {
if (StringUtils.isNotBlank(group.getAddressList())) {
registryList = Arrays.asList(group.getAddressList().split(","));
}
}
if (CollectionUtils.isNotEmpty(registryList)) {
executerAddressSet.addAll(registryList);
} }
} }
} }
int executorCount = executerAddressSet.size(); int executorCount = executerAddressSet.size();
Map<String, Object> dashboardMap = new HashMap<String, Object>(); Map<String, Object> dashboardMap = new HashMap<String, Object>();

@ -21,12 +21,19 @@
t.address_list t.address_list
</sql> </sql>
<select id="findAll" parameterType="java.lang.Integer" resultMap="XxlJobGroup"> <select id="findAll" resultMap="XxlJobGroup">
SELECT <include refid="Base_Column_List" /> SELECT <include refid="Base_Column_List" />
FROM XXL_JOB_QRTZ_TRIGGER_GROUP AS t FROM XXL_JOB_QRTZ_TRIGGER_GROUP AS t
ORDER BY t.order ASC ORDER BY t.order ASC
</select> </select>
<select id="findByAddressType" parameterType="java.lang.Integer" resultMap="XxlJobGroup">
SELECT <include refid="Base_Column_List" />
FROM XXL_JOB_QRTZ_TRIGGER_GROUP AS t
WHERE t.address_type = #{addressType}
ORDER BY t.order ASC
</select>
<insert id="save" parameterType="com.xxl.job.admin.core.model.XxlJobGroup" > <insert id="save" parameterType="com.xxl.job.admin.core.model.XxlJobGroup" >
INSERT INTO XXL_JOB_QRTZ_TRIGGER_GROUP ( `app_name`, `title`, `order`, `address_type`, `address_list`) INSERT INTO XXL_JOB_QRTZ_TRIGGER_GROUP ( `app_name`, `title`, `order`, `address_type`, `address_list`)
values ( #{appName}, #{title}, #{order}, #{addressType}, #{addressList}); values ( #{appName}, #{title}, #{order}, #{addressType}, #{addressList});

Loading…
Cancel
Save