xueli.xue 8 years ago
parent 1e0ba18606
commit 628d421941

@ -52,12 +52,14 @@ public class RemoteHttpJobBean extends QuartzJobBean {
params.put(HandlerParamEnum.JOB_GROUP.name(), jobInfo.getJobGroup()); params.put(HandlerParamEnum.JOB_GROUP.name(), jobInfo.getJobGroup());
params.put(HandlerParamEnum.JOB_NAME.name(), jobInfo.getJobName()); params.put(HandlerParamEnum.JOB_NAME.name(), jobInfo.getJobName());
params.put(HandlerParamEnum.EXECUTOR_HANDLER.name(), jobInfo.getExecutorHandler());
params.put(HandlerParamEnum.EXECUTOR_PARAMS.name(), jobInfo.getExecutorParam()); params.put(HandlerParamEnum.EXECUTOR_PARAMS.name(), jobInfo.getExecutorParam());
params.put(HandlerParamEnum.GLUE_SWITCH.name(), String.valueOf(jobInfo.getGlueSwitch())); params.put(HandlerParamEnum.GLUE_SWITCH.name(), String.valueOf(jobInfo.getGlueSwitch()));
// failover trigger // failover trigger
RemoteCallBack callback = failoverTrigger(jobInfo.getExecutorAddress(), params, jobLog); RemoteCallBack callback = failoverTrigger(jobInfo.getExecutorAddress(), params, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, callback:{}", jobLog.getId(), callback); logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, callback:{}", jobLog.getId(), callback);

@ -15,6 +15,7 @@ public class XxlJobLog {
private String jobName; private String jobName;
private String executorAddress; // 执行器地址,有多个则逗号分隔 private String executorAddress; // 执行器地址,有多个则逗号分隔
private String executorHandler; // 执行器Handler
private String executorParam; // 执行器,任务参数 private String executorParam; // 执行器,任务参数
// trigger info // trigger info
@ -51,6 +52,12 @@ public class XxlJobLog {
public void setExecutorAddress(String executorAddress) { public void setExecutorAddress(String executorAddress) {
this.executorAddress = executorAddress; this.executorAddress = executorAddress;
} }
public String getExecutorHandler() {
return executorHandler;
}
public void setExecutorHandler(String executorHandler) {
this.executorHandler = executorHandler;
}
public String getExecutorParam() { public String getExecutorParam() {
return executorParam; return executorParam;
} }

@ -10,6 +10,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
@ -51,8 +54,13 @@ public class JobMonitorHelper {
if (RemoteCallBack.FAIL.equals(log.getTriggerStatus()) || RemoteCallBack.FAIL.equals(log.getHandleStatus())) { if (RemoteCallBack.FAIL.equals(log.getTriggerStatus()) || RemoteCallBack.FAIL.equals(log.getHandleStatus())) {
XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
MailUtil.sendMail(info.getAlarmEmail(), "《调度监控报警-调度平台平台XXL-JOB》",
MessageFormat.format("任务调度失败, 任务组:{0}, 任务描述:{1}.", info.getJobGroup(), info.getJobDesc()), false, null); Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
for (String email: emailSet) {
String title = "《调度监控报警-调度平台平台XXL-JOB》";
String content = MessageFormat.format("任务调度失败, 任务组:{0}, 任务描述:{1}.", info.getJobGroup(), info.getJobDesc());
MailUtil.sendMail(email, title, content, false, null);
}
} }
} }
} }

@ -10,6 +10,7 @@
<result column="job_name" property="jobName" /> <result column="job_name" property="jobName" />
<result column="executor_address" property="executorAddress" /> <result column="executor_address" property="executorAddress" />
<result column="executor_handler" property="executorHandler" />
<result column="executor_param" property="executorParam" /> <result column="executor_param" property="executorParam" />
<result column="trigger_time" property="triggerTime" /> <result column="trigger_time" property="triggerTime" />
@ -27,6 +28,7 @@
t.job_group, t.job_group,
t.job_name, t.job_name,
t.executor_address, t.executor_address,
t.executor_handler,
t.executor_param, t.executor_param,
t.trigger_time, t.trigger_time,
t.trigger_status, t.trigger_status,
@ -94,11 +96,13 @@
`job_group`, `job_group`,
`job_name`, `job_name`,
`executor_address`, `executor_address`,
`executor_handler`,
`executor_param` `executor_param`
) VALUES ( ) VALUES (
#{jobGroup}, #{jobGroup},
#{jobName}, #{jobName},
#{executorAddress}, #{executorAddress},
#{executorHandler},
#{executorParam} #{executorParam}
); );
<selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id"> <selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id">
@ -113,6 +117,7 @@
`trigger_status`= #{triggerStatus}, `trigger_status`= #{triggerStatus},
`trigger_msg`= #{triggerMsg}, `trigger_msg`= #{triggerMsg},
`executor_address`= #{executorAddress}, `executor_address`= #{executorAddress},
`executor_handler`=#{executorHandler},
`executor_param`= #{executorParam} `executor_param`= #{executorParam}
WHERE `id`= #{id} WHERE `id`= #{id}
</update> </update>

@ -40,7 +40,7 @@
<div class="row"> <div class="row">
<div class="col-xs-4"> <div class="col-xs-4">
<div class="input-group"> <div class="input-group">
<span class="input-group-addon">组</span> <span class="input-group-addon">任务组</span>
<select class="form-control" id="jobGroup" > <select class="form-control" id="jobGroup" >
<#list JobGroupList as group> <#list JobGroupList as group>
<option value="${group}" >${group.desc}</option> <option value="${group}" >${group.desc}</option>
@ -113,7 +113,7 @@
<div class="modal-body"> <div class="modal-body">
<form class="form-horizontal form" role="form" > <form class="form-horizontal form" role="form" >
<div class="form-group"> <div class="form-group">
<label for="firstname" class="col-sm-2 control-label">组<font color="red">*</font></label> <label for="firstname" class="col-sm-2 control-label">任务组<font color="red">*</font></label>
<div class="col-sm-4"> <div class="col-sm-4">
<select class="form-control" name="jobGroup" > <select class="form-control" name="jobGroup" >
<#list JobGroupList as group> <#list JobGroupList as group>
@ -194,7 +194,7 @@ public class DemoJobHandler extends IJobHandler {
<div class="modal-body"> <div class="modal-body">
<form class="form-horizontal form" role="form" > <form class="form-horizontal form" role="form" >
<div class="form-group"> <div class="form-group">
<label for="firstname" class="col-sm-2 control-label">组<font color="red">*</font></label> <label for="firstname" class="col-sm-2 control-label">任务组<font color="red">*</font></label>
<div class="col-sm-4"> <div class="col-sm-4">
<input type="text" class="form-control jobGroupTitle" maxlength="50" readonly > <input type="text" class="form-control jobGroupTitle" maxlength="50" readonly >
</div> </div>

@ -34,7 +34,7 @@
<div class="row"> <div class="row">
<div class="col-xs-3"> <div class="col-xs-3">
<div class="input-group"> <div class="input-group">
<span class="input-group-addon">组</span> <span class="input-group-addon">任务组</span>
<select class="form-control" id="jobGroup" paramVal="${jobGroup}" > <select class="form-control" id="jobGroup" paramVal="${jobGroup}" >
<option value="" selected>请选择</option> <option value="" selected>请选择</option>
<#list JobGroupList as group> <#list JobGroupList as group>
@ -45,7 +45,7 @@
</div> </div>
<div class="col-xs-3"> <div class="col-xs-3">
<div class="input-group"> <div class="input-group">
<span class="input-group-addon">名称</span> <span class="input-group-addon">描述</span>
<select class="form-control" id="jobName" paramVal="${jobName}" > <select class="form-control" id="jobName" paramVal="${jobName}" >
<option value="" >请选择</option> <option value="" >请选择</option>
</select> </select>
@ -79,6 +79,7 @@
<th name="jobGroup" >任务组</th> <th name="jobGroup" >任务组</th>
<th name="jobName" >任务名</th> <th name="jobName" >任务名</th>
<th name="executorAddress" >执行器地址</th> <th name="executorAddress" >执行器地址</th>
<th name="executorHandler" >JobHandler</th>
<th name="executorParam" >任务参数</th> <th name="executorParam" >任务参数</th>
<th name="triggerTime" >调度时间</th> <th name="triggerTime" >调度时间</th>
<th name="triggerStatus" >调度结果</th> <th name="triggerStatus" >调度结果</th>

@ -1,6 +1,6 @@
$(function() { $(function() {
// 组列表选中, 任务列表初始化和选中 // 任务组列表选中, 任务列表初始化和选中
var ifParam = true; var ifParam = true;
$("#jobGroup").on("change", function () { $("#jobGroup").on("change", function () {
var jobGroup = $(this).children('option:selected').val(); var jobGroup = $(this).children('option:selected').val();
@ -97,6 +97,7 @@ $(function() {
}, },
{ "data": 'jobName', "visible" : false}, { "data": 'jobName', "visible" : false},
{ "data": 'executorAddress', "visible" : true}, { "data": 'executorAddress', "visible" : true},
{ "data": 'executorHandler', "visible" : true},
{ "data": 'executorParam', "visible" : true}, { "data": 'executorParam', "visible" : true},
{ {
"data": 'triggerTime', "data": 'triggerTime',

@ -1,7 +1,8 @@
package com.xxl.job.core.executor.jetty; package com.xxl.job.core.executor.jetty;
import java.util.*; import com.xxl.job.core.handler.HandlerRepository;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -14,9 +15,7 @@ import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import com.xxl.job.core.handler.HandlerRepository; import java.util.Map;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander;
/** /**
* Created by xuxueli on 2016/3/2 21:14. * Created by xuxueli on 2016/3/2 21:14.
@ -89,14 +88,9 @@ public class XxlJobExecutor implements ApplicationContextAware {
if (serviceBeanMap!=null && serviceBeanMap.size()>0) { if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) { for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){ if (serviceBean instanceof IJobHandler){
String jobKeys = serviceBean.getClass().getAnnotation(JobHander.class).value(); String name = serviceBean.getClass().getAnnotation(JobHander.class).value();
if (jobKeys!=null && jobKeys.trim().length()>0) { IJobHandler handler = (IJobHandler) serviceBean;
Set<String> jobKeySet = new HashSet<String>(Arrays.asList(jobKeys.split(","))); HandlerRepository.registJobHandler(name, handler);
for (String jobKey : jobKeySet) {
IJobHandler handler = (IJobHandler) serviceBean;
HandlerRepository.regist(jobKey, handler);
}
}
} }
} }
} }

@ -38,6 +38,10 @@ public class HandlerRepository {
* job name * job name
*/ */
JOB_NAME, JOB_NAME,
/**
* params of jobhandler
*/
EXECUTOR_HANDLER,
/** /**
* params of jobhandler * params of jobhandler
*/ */
@ -60,21 +64,27 @@ public class HandlerRepository {
LOG_DATE LOG_DATE
} }
public enum ActionEnum{RUN, KILL, LOG, BEAT} public enum ActionEnum{RUN, KILL, LOG, BEAT}
// jobhandler repository
private static ConcurrentHashMap<String, IJobHandler> handlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static void registJobHandler(String name, IJobHandler jobHandler){
handlerRepository.put(name, jobHandler);
logger.info("xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
}
// thread repository of jobhandler
public static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>(); public static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>();
public static HandlerThread registJobHandlerThread(String jobkey, IJobHandler handler){
// regist handler
public static void regist(String handleName, IJobHandler handler){
HandlerThread handlerThread = new HandlerThread(handler); HandlerThread handlerThread = new HandlerThread(handler);
handlerThread.start(); handlerThread.start();
handlerTreadMap.put(handleName, handlerThread); // putIfAbsent logger.info(">>>>>>>>>>> xxl-job regist handler success, jobkey:{}, handler:{}", new Object[]{jobkey, handler});
logger.info(">>>>>>>>>>> xxl-job regist handler success, handleName:{}, handler:{}", new Object[]{handleName, handler}); return handlerTreadMap.put(jobkey, handlerThread); // putIfAbsent
} }
// handler push to queue // handler push to queue
public static String service(Map<String, String> _param) { public static String service(Map<String, String> _param) {
logger.debug(">>>>>>>>>>> xxl-job service start, _param:{}", new Object[]{_param}); logger.debug(">>>>>>>>>>> xxl-job service start, _param:{}", new Object[]{_param});
// callback // callback
RemoteCallBack callback = new RemoteCallBack(); RemoteCallBack callback = new RemoteCallBack();
callback.setStatus(RemoteCallBack.FAIL); callback.setStatus(RemoteCallBack.FAIL);
@ -91,7 +101,7 @@ public class HandlerRepository {
callback.setMsg("Timestamp check failed."); callback.setMsg("Timestamp check failed.");
return JacksonUtil.writeValueAsString(callback); return JacksonUtil.writeValueAsString(callback);
} }
// parse namespace // parse namespace
if (namespace.equals(ActionEnum.RUN.name())) { if (namespace.equals(ActionEnum.RUN.name())) {
@ -110,20 +120,40 @@ public class HandlerRepository {
return JacksonUtil.writeValueAsString(callback); return JacksonUtil.writeValueAsString(callback);
} }
// load old thread
String jobKey = job_group.concat("_").concat(job_name); String jobKey = job_group.concat("_").concat(job_name);
HandlerThread handlerThread = handlerTreadMap.get(jobKey); HandlerThread handlerThread = handlerTreadMap.get(jobKey);
if ("0".equals(handler_glue_switch)) { if ("0".equals(handler_glue_switch)) {
// bean model // bean model
if (handlerThread == null) {
callback.setMsg("handler for jobKey=[" + jobKey + "] not found."); // handler name
String executor_handler = _param.get(HandlerParamEnum.EXECUTOR_HANDLER.name());
if (executor_handler==null || executor_handler.trim().length()==0){
callback.setMsg("EXECUTOR_HANDLER is null.");
return JacksonUtil.writeValueAsString(callback); return JacksonUtil.writeValueAsString(callback);
} }
// handler instance
IJobHandler jobHandler = handlerRepository.get(executor_handler);
if (handlerThread == null) {
// jobhandler match
if (jobHandler==null) {
callback.setMsg("handler for jobKey=[" + jobKey + "] not found.");
return JacksonUtil.writeValueAsString(callback);
}
handlerThread = HandlerRepository.registJobHandlerThread(jobKey, jobHandler);
} else {
if (handlerThread.getHandler() != jobHandler) {
handlerThread = HandlerRepository.registJobHandlerThread(jobKey, jobHandler);
}
}
} else { } else {
// glue // glue
if (handlerThread==null) { if (handlerThread == null) {
HandlerRepository.regist(jobKey, new GlueJobHandler(job_group, job_name)); handlerThread = HandlerRepository.registJobHandlerThread(jobKey, new GlueJobHandler(job_group, job_name));
} }
handlerThread = handlerTreadMap.get(jobKey);
} }
// push data to queue // push data to queue
@ -145,7 +175,7 @@ public class HandlerRepository {
IJobHandler handler = handlerThread.getHandler(); IJobHandler handler = handlerThread.getHandler();
handlerThread.toStop(); handlerThread.toStop();
handlerThread.interrupt(); handlerThread.interrupt();
regist(jobKey, handler); HandlerRepository.registJobHandlerThread(jobKey, handler);
callback.setStatus(RemoteCallBack.SUCCESS); callback.setStatus(RemoteCallBack.SUCCESS);
} else { } else {
callback.setMsg("handler for jobKey=[" + jobKey + "] not found."); callback.setMsg("handler for jobKey=[" + jobKey + "] not found.");
@ -179,11 +209,11 @@ public class HandlerRepository {
callback.setMsg("param[Action] is not valid."); callback.setMsg("param[Action] is not valid.");
return JacksonUtil.writeValueAsString(callback); return JacksonUtil.writeValueAsString(callback);
} }
logger.debug(">>>>>>>>>>> xxl-job service end, triggerData:{}"); logger.debug(">>>>>>>>>>> xxl-job service end, triggerData:{}");
return JacksonUtil.writeValueAsString(callback); return JacksonUtil.writeValueAsString(callback);
} }
// ----------------------- for callback log ----------------------- // ----------------------- for callback log -----------------------
private static LinkedBlockingQueue<HashMap<String, String>> callBackQueue = new LinkedBlockingQueue<HashMap<String, String>>(); private static LinkedBlockingQueue<HashMap<String, String>> callBackQueue = new LinkedBlockingQueue<HashMap<String, String>>();
static { static {

@ -20,7 +20,7 @@ import com.xxl.job.core.handler.annotation.JobHander;
* *
* @author xuxueli 2015-12-19 19:43:36 * @author xuxueli 2015-12-19 19:43:36
*/ */
@JobHander(value="waimai_201607202316260736,waimai_201607202316260736") @JobHander(value="demoJobHandler")
@Service @Service
public class DemoJobHandler extends IJobHandler { public class DemoJobHandler extends IJobHandler {
private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class); private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class);

Loading…
Cancel
Save