调度中心, 支持根据AppName自动发现执行器地址;

v1.5
xueli.xue 8 years ago
parent 662e129afc
commit 5c16e16d96

@ -3,8 +3,10 @@ package com.xxl.job.admin.core.jobbean;
import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer; import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.admin.core.thread.JobMonitorHelper; import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.router.HandlerRouter.ActionRepository; import com.xxl.job.core.router.HandlerRouter.ActionRepository;
import com.xxl.job.core.router.model.RequestModel; import com.xxl.job.core.router.model.RequestModel;
import com.xxl.job.core.router.model.ResponseModel; import com.xxl.job.core.router.model.ResponseModel;
@ -18,10 +20,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.scheduling.quartz.QuartzJobBean;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Arrays; import java.util.*;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/** /**
* http job bean * http job bean
@ -57,8 +56,25 @@ public class RemoteHttpJobBean extends QuartzJobBean {
requestModel.setLogAddress(XxlJobLogCallbackServer.getTrigger_log_address()); requestModel.setLogAddress(XxlJobLogCallbackServer.getTrigger_log_address());
requestModel.setLogId(jobLog.getId()); requestModel.setLogId(jobLog.getId());
// parse address
List<String> addressList = new ArrayList<String>();
String parseAddressMsg = null;
if (StringUtils.isNotBlank(jobInfo.getExecutorAppname())) {
List<XxlJobRegistry> xxlJobRegistryList = DynamicSchedulerUtil.xxlJobRegistryDao.findRegistrys(RegistHelper.RegistType.EXECUTOR.name(), jobInfo.getExecutorAppname());
if (xxlJobRegistryList!=null && xxlJobRegistryList.size()>0) {
for (XxlJobRegistry item: xxlJobRegistryList) {
addressList.add(item.getRegistryValue());
}
}
parseAddressMsg = MessageFormat.format("Parse Address (Appname注册方式) <br>>>>[address list] : {0}<br><hr>", addressList.toArray());
} else {
List<String> addressArr = Arrays.asList(jobInfo.getExecutorAddress().split(","));
addressList.addAll(addressArr);
parseAddressMsg = MessageFormat.format("Parse Address (地址配置方式) <br>>>>[address list] : {0}<br><hr>", addressList.toArray());
}
// failover trigger // failover trigger
ResponseModel responseModel = failoverTrigger(jobInfo.getExecutorAddress(), requestModel, jobLog); ResponseModel responseModel = failoverTrigger(addressList, requestModel, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString()); logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString());
@ -66,7 +82,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
// update trigger info // update trigger info
jobLog.setTriggerTime(new Date()); jobLog.setTriggerTime(new Date());
jobLog.setTriggerStatus(responseModel.getStatus()); jobLog.setTriggerStatus(responseModel.getStatus());
jobLog.setTriggerMsg(responseModel.getMsg()); jobLog.setTriggerMsg(parseAddressMsg + responseModel.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog); DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog);
// monitor triger // monitor triger
@ -78,16 +94,14 @@ public class RemoteHttpJobBean extends QuartzJobBean {
/** /**
* failover for trigger remote address * failover for trigger remote address
* @param handler_address
* @return * @return
*/ */
public ResponseModel failoverTrigger(String handler_address, RequestModel requestModel, XxlJobLog jobLog){ public ResponseModel failoverTrigger(List<String> addressList, RequestModel requestModel, XxlJobLog jobLog){
if (handler_address.split(",").length > 1) { if (addressList.size() > 1) {
// for ha // for ha
List<String> addressList = Arrays.asList(handler_address.split(","));
Collections.shuffle(addressList); Collections.shuffle(addressList);
// for failover // for failover
String failoverMessage = ""; String failoverMessage = "";
for (String address : addressList) { for (String address : addressList) {
@ -119,14 +133,20 @@ public class RemoteHttpJobBean extends QuartzJobBean {
result.setStatus(ResponseModel.FAIL); result.setStatus(ResponseModel.FAIL);
result.setMsg(failoverMessage); result.setMsg(failoverMessage);
return result; return result;
} else { } else if (addressList.size() == 1) {
String address = addressList.get(0);
// store real address // store real address
jobLog.setExecutorAddress(handler_address); jobLog.setExecutorAddress(address);
ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(handler_address), requestModel); ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel);
String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", handler_address, triggerCallback.getStatus(), triggerCallback.getMsg()); String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, triggerCallback.getStatus(), triggerCallback.getMsg());
triggerCallback.setMsg(failoverMessage); triggerCallback.setMsg(failoverMessage);
return triggerCallback; return triggerCallback;
} else {
ResponseModel result = new ResponseModel();
result.setStatus(ResponseModel.FAIL);
result.setMsg( "Trigger error, <br>>>>address list is null <br><hr>" );
return result;
} }
} }

@ -0,0 +1,55 @@
package com.xxl.job.admin.core.model;
import java.util.Date;
/**
* Created by xuxueli on 16/9/30.
*/
public class XxlJobRegistry {
private int id;
private String registryGroup;
private String registryKey;
private String registryValue;
private Date updateTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getRegistryGroup() {
return registryGroup;
}
public void setRegistryGroup(String registryGroup) {
this.registryGroup = registryGroup;
}
public String getRegistryKey() {
return registryKey;
}
public void setRegistryKey(String registryKey) {
this.registryKey = registryKey;
}
public String getRegistryValue() {
return registryValue;
}
public void setRegistryValue(String registryValue) {
this.registryValue = registryValue;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}

@ -1,26 +1,13 @@
package com.xxl.job.admin.core.util; package com.xxl.job.admin.core.util;
import java.util.ArrayList; import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
import org.quartz.CronScheduleBuilder; import com.xxl.job.admin.core.model.XxlJobInfo;
import org.quartz.CronTrigger; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import org.quartz.Job; import com.xxl.job.admin.dao.IXxlJobLogDao;
import org.quartz.JobBuilder; import com.xxl.job.admin.dao.IXxlJobRegistryDao;
import org.quartz.JobDetail; import org.quartz.*;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.Trigger.TriggerState; import org.quartz.Trigger.TriggerState;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.impl.triggers.CronTriggerImpl; import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -31,10 +18,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer; import java.util.*;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao;
/** /**
* base quartz scheduler util * base quartz scheduler util
@ -77,11 +61,13 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init
// xxlJobLogDao、xxlJobInfoDao // xxlJobLogDao、xxlJobInfoDao
public static IXxlJobLogDao xxlJobLogDao; public static IXxlJobLogDao xxlJobLogDao;
public static IXxlJobInfoDao xxlJobInfoDao; public static IXxlJobInfoDao xxlJobInfoDao;
public static IXxlJobRegistryDao xxlJobRegistryDao;
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class); DynamicSchedulerUtil.xxlJobLogDao = applicationContext.getBean(IXxlJobLogDao.class);
DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class); DynamicSchedulerUtil.xxlJobInfoDao = applicationContext.getBean(IXxlJobInfoDao.class);
DynamicSchedulerUtil.xxlJobRegistryDao = applicationContext.getBean(IXxlJobRegistryDao.class);
} }
@Override @Override

@ -0,0 +1,12 @@
package com.xxl.job.admin.dao;
import com.xxl.job.admin.core.model.XxlJobRegistry;
import java.util.List;
/**
* Created by xuxueli on 16/9/30.
*/
public interface IXxlJobRegistryDao {
List<XxlJobRegistry> findRegistrys(String registryGroup, String registryKey);
}

@ -0,0 +1,30 @@
package com.xxl.job.admin.dao.impl;
import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.admin.dao.IXxlJobRegistryDao;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by xuxueli on 16/9/30.
*/
@Repository
public class XxlJobRegistryDaoImpl implements IXxlJobRegistryDao {
@Resource
public SqlSessionTemplate sqlSessionTemplate;
@Override
public List<XxlJobRegistry> findRegistrys(String registryGroup, String registryKey) {
Map<String, String> params = new HashMap<String, String>();
params.put("registryGroup", registryGroup);
params.put("registryKey", registryKey);
return sqlSessionTemplate.selectList("XxlJobRegistryMapper.findRegistrys", params);
}
}

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="XxlJobRegistryMapper">
<resultMap id="XxlJobRegistry" type="com.xxl.job.admin.core.model.XxlJobRegistry" >
<result column="id" property="id" />
<result column="registry_group" property="registryGroup" />
<result column="registry_key" property="registryKey" />
<result column="registry_value" property="registryValue" />
<result column="update_time" property="updateTime" />
</resultMap>
<sql id="Base_Column_List">
t.id,
t.registry_group,
t.registry_key,
t.registry_value,
t.update_time
</sql>
<select id="findRegistrys" parameterType="java.util.HashMap" resultMap="XxlJobRegistry">
SELECT <include refid="Base_Column_List" />
FROM XXL_JOB_QRTZ_TRIGGER_REGISTRY AS t
WHERE t.registry_group = #{registryGroup}
AND t.registry_key = #{registryKey}
AND t.update_time <![CDATA[ > ]]> DATE_ADD(NOW(),INTERVAL -30 SECOND)
</select>
<delete id="refresh" >
delete from XXL_JOB_QRTZ_TRIGGER_REGISTRY
WHERE update_time <![CDATA[ < ]]> DATE_ADD(NOW(),INTERVAL -30 SECOND)
</delete>
</mapper>
Loading…
Cancel
Save