pull/1/head
xueli.xue 8 years ago
parent 86dea7ff7c
commit b590a65c4a

@ -165,7 +165,7 @@ CREATE TABLE XXL_JOB_QRTZ_TRIGGER_INFO (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE XXL_JOB_QRTZ_TRIGGER_LOG (
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '任务组',
`job_name` varchar(255) NOT NULL COMMENT '任务名',
@ -173,10 +173,10 @@ CREATE TABLE XXL_JOB_QRTZ_TRIGGER_LOG (
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(255) DEFAULT NULL COMMENT 'executor_param',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_status` varchar(255) DEFAULT NULL COMMENT '调度-结果',
`trigger_code` varchar(255) DEFAULT NULL COMMENT '调度-结果',
`trigger_msg` varchar(2048) DEFAULT NULL COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_status` varchar(255) DEFAULT NULL COMMENT '执行-状态',
`handle_code` varchar(255) DEFAULT NULL COMMENT '执行-状态',
`handle_msg` varchar(2048) DEFAULT NULL COMMENT '执行-日志',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

@ -97,7 +97,7 @@ public class JobLogController {
if (log == null) {
return new ReturnT<String>(500, "查看执行日志失败: 参数异常");
}
if (!((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) || StringUtils.isNotBlank(log.getHandleStatus()))) {
if (ReturnT.SUCCESS_CODE != log.getTriggerCode()) {
return new ReturnT<String>(500, "查看执行日志失败: 任务发起调度失败,无法查看执行日志");
}
@ -134,7 +134,7 @@ public class JobLogController {
if (log == null || jobInfo==null) {
return new ReturnT<String>(500, "参数异常");
}
if (!(ReturnT.SUCCESS_CODE +"").equals(log.getTriggerStatus())) {
if (ReturnT.SUCCESS_CODE != log.getTriggerCode()) {
return new ReturnT<String>(500, "调度失败,无法终止日志");
}
@ -149,7 +149,7 @@ public class JobLogController {
ReturnT<String> runResult = executorBiz.kill(String.valueOf(log.getJobGroup()), log.getJobName());
if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
log.setHandleStatus(ReturnT.SUCCESS_CODE+"");
log.setHandleCode(ReturnT.SUCCESS_CODE);
log.setHandleMsg("人为操作主动终止");
log.setHandleTime(new Date());
xxlJobLogDao.updateHandleInfo(log);

@ -4,8 +4,8 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import org.apache.commons.lang.StringUtils;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
@ -21,17 +21,17 @@ public class AdminBizImpl implements AdminBiz {
private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class);
@Override
public ReturnT<String> callback(TriggerParam triggerParam) {
public ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(triggerParam.getLogId());
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(handleCallbackParam.getLogId());
if (log == null) {
return new ReturnT(ReturnT.FAIL_CODE, "log item not found.");
}
// trigger success, to trigger child job, and avoid repeat trigger child job
String childTriggerMsg = null;
if ((ReturnT.SUCCESS_CODE+"").equals(triggerParam.getStatus()) && !(ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) {
if (ReturnT.SUCCESS_CODE==handleCallbackParam.getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) {
XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
childTriggerMsg = "<hr>";
@ -65,8 +65,11 @@ public class AdminBizImpl implements AdminBiz {
// handle msg
StringBuffer handleMsg = new StringBuffer();
if (triggerParam.getMsg() != null) {
handleMsg.append("执行备注:").append(triggerParam.getMsg());
if (log.getHandleMsg()!=null) {
handleMsg.append(log.getHandleMsg()).append("<br>");
}
if (handleCallbackParam.getMsg() != null) {
handleMsg.append("执行备注:").append(handleCallbackParam.getMsg());
}
if (childTriggerMsg !=null) {
handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);
@ -74,7 +77,7 @@ public class AdminBizImpl implements AdminBiz {
// success, save log
log.setHandleTime(new Date());
log.setHandleStatus(triggerParam.getStatus());
log.setHandleCode(handleCallbackParam.getCode());
log.setHandleMsg(handleMsg.toString());
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);

@ -80,7 +80,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString());
// update trigger info 2/2
jobLog.setTriggerStatus(responseModel.getCode()+"");
jobLog.setTriggerCode(responseModel.getCode());
jobLog.setTriggerMsg(responseModel.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog);

@ -13,92 +13,115 @@ public class XxlJobLog {
// job info
private int jobGroup;
private String jobName;
private String executorAddress; // 执行器地址,有多个则逗号分隔
private String executorHandler; // 执行器Handler
private String executorParam; // 执行器,任务参数
// execute info
private String executorAddress;
private String executorHandler;
private String executorParam;
// trigger info
private Date triggerTime;
private String triggerStatus;
private int triggerCode;
private String triggerMsg;
// handle info
private Date handleTime;
private String handleStatus;
private int handleCode;
private String handleMsg;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getJobGroup() {
return jobGroup;
}
public void setJobGroup(int jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getExecutorAddress() {
return executorAddress;
}
public void setExecutorAddress(String executorAddress) {
this.executorAddress = executorAddress;
}
public String getExecutorHandler() {
return executorHandler;
}
public void setExecutorHandler(String executorHandler) {
this.executorHandler = executorHandler;
}
public String getExecutorParam() {
return executorParam;
}
public void setExecutorParam(String executorParam) {
this.executorParam = executorParam;
}
public Date getTriggerTime() {
return triggerTime;
}
public void setTriggerTime(Date triggerTime) {
this.triggerTime = triggerTime;
}
public String getTriggerStatus() {
return triggerStatus;
public int getTriggerCode() {
return triggerCode;
}
public void setTriggerStatus(String triggerStatus) {
this.triggerStatus = triggerStatus;
public void setTriggerCode(int triggerCode) {
this.triggerCode = triggerCode;
}
public String getTriggerMsg() {
return triggerMsg;
}
public void setTriggerMsg(String triggerMsg) {
this.triggerMsg = triggerMsg;
}
public Date getHandleTime() {
return handleTime;
}
public void setHandleTime(Date handleTime) {
this.handleTime = handleTime;
}
public String getHandleStatus() {
return handleStatus;
public int getHandleCode() {
return handleCode;
}
public void setHandleStatus(String handleStatus) {
this.handleStatus = handleStatus;
public void setHandleCode(int handleCode) {
this.handleCode = handleCode;
}
public String getHandleMsg() {
return handleMsg;
}
public void setHandleMsg(String handleMsg) {
this.handleMsg = handleMsg;
}
}

@ -6,7 +6,6 @@ import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.schedule.DynamicSchedulerUtil;
import com.xxl.job.admin.core.util.MailUtil;
import com.xxl.job.core.biz.model.ReturnT;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,7 +40,8 @@ public class JobMonitorHelper {
logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId);
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId);
if (log!=null) {
if ((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) {
if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) {
// running
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
@ -49,16 +49,16 @@ public class JobMonitorHelper {
}
JobMonitorHelper.monitor(jobLogId);
}
if ((ReturnT.SUCCESS_CODE+"").equals(log.getTriggerStatus()) && (ReturnT.SUCCESS_CODE+"").equals(log.getHandleStatus())) {
if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && ReturnT.SUCCESS_CODE==log.getHandleCode()) {
// pass
}
if ((ReturnT.FAIL+"").equals(log.getTriggerStatus()) || (ReturnT.FAIL+"").equals(log.getHandleStatus())) {
if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
XxlJobInfo info = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
for (String email: emailSet) {
String title = "《调度监控报警-任务调度中心XXL-JOB》";
String title = "《调度监控报警》(任务调度中心XXL-JOB)";
XxlJobGroup group = DynamicSchedulerUtil.xxlJobGroupDao.load(Integer.valueOf(info.getJobGroup()));
String content = MessageFormat.format("任务调度失败, 执行器名称:{0}, 任务描述:{1}.", group!=null?group.getTitle():"null", info.getJobDesc());
MailUtil.sendMail(email, title, content, false, null);

@ -14,11 +14,11 @@
<result column="executor_param" property="executorParam" />
<result column="trigger_time" property="triggerTime" />
<result column="trigger_status" property="triggerStatus" />
<result column="trigger_code" property="triggerCode" />
<result column="trigger_msg" property="triggerMsg" />
<result column="handle_time" property="handleTime" />
<result column="handle_status" property="handleStatus" />
<result column="handle_code" property="handleCode" />
<result column="handle_msg" property="handleMsg" />
</resultMap>
@ -31,10 +31,10 @@
t.executor_handler,
t.executor_param,
t.trigger_time,
t.trigger_status,
t.trigger_code,
t.trigger_msg,
t.handle_time,
t.handle_status,
t.handle_code,
t.handle_msg
</sql>
@ -108,7 +108,7 @@
UPDATE XXL_JOB_QRTZ_TRIGGER_LOG
SET
`trigger_time`= #{triggerTime},
`trigger_status`= #{triggerStatus},
`trigger_code`= #{triggerCode},
`trigger_msg`= #{triggerMsg},
`executor_address`= #{executorAddress},
`executor_handler`=#{executorHandler},
@ -120,7 +120,7 @@
UPDATE XXL_JOB_QRTZ_TRIGGER_LOG
SET
`handle_time`= #{handleTime},
`handle_status`= #{handleStatus},
`handle_code`= #{handleCode},
`handle_msg`= #{handleMsg}
WHERE `id`= #{id}
</update>

@ -1,13 +1,13 @@
package com.xxl.job.core.biz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
/**
* Created by xuxueli on 17/3/1.
*/
public interface AdminBiz {
public ReturnT<String> callback(TriggerParam triggerParam);
public ReturnT<String> callback(HandleCallbackParam handleCallbackParam);
}

@ -0,0 +1,57 @@
package com.xxl.job.core.biz.model;
import java.io.Serializable;
import java.util.Set;
/**
* Created by xuxueli on 17/3/2.
*/
public class HandleCallbackParam implements Serializable {
private static final long serialVersionUID = 42L;
private int logId;
private Set<String> logAddress;
private int code;
private String msg;
public HandleCallbackParam(int logId, Set<String> logAddress, int code, String msg) {
this.logId = logId;
this.logAddress = logAddress;
this.code = code;
this.msg = msg;
}
public int getLogId() {
return logId;
}
public void setLogId(int logId) {
this.logId = logId;
}
public Set<String> getLogAddress() {
return logAddress;
}
public void setLogAddress(Set<String> logAddress) {
this.logAddress = logAddress;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}

@ -23,9 +23,6 @@ public class TriggerParam implements Serializable{
private int logId;
private long logDateTim;
private String status;
private String msg;
public String getAction() {
return action;
}
@ -98,22 +95,6 @@ public class TriggerParam implements Serializable{
this.logDateTim = logDateTim;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "RequestModel{" +
@ -126,8 +107,6 @@ public class TriggerParam implements Serializable{
", logAddress=" + logAddress +
", logId=" + logId +
", logDateTim=" + logDateTim +
", status='" + status + '\'' +
", msg='" + msg + '\'' +
'}';
}
}

@ -1,5 +1,6 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.handler.IJobHandler;
@ -96,14 +97,10 @@ public class JobThread extends Thread{
// callback handler info
if (!toStop) {
// commonm
triggerParam.setStatus(_code+"");
triggerParam.setMsg(_msg);
TriggerCallbackThread.pushCallBack(triggerParam);
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), _code, _msg));
} else {
// is killed
triggerParam.setStatus(ReturnT.FAIL_CODE+"");
triggerParam.setMsg(stopReason + " [业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(triggerParam);
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"));
}
}
} catch (Exception e) {
@ -116,9 +113,7 @@ public class JobThread extends Thread{
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
triggerParam.setStatus(ReturnT.FAIL_CODE+"");
triggerParam.setMsg(stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(triggerParam);
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"));
}
}

@ -1,8 +1,8 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,18 +15,17 @@ import java.util.concurrent.LinkedBlockingQueue;
public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static LinkedBlockingQueue<TriggerParam> callBackQueue = new LinkedBlockingQueue<TriggerParam>();
private static LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
static {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
TriggerParam callback = callBackQueue.take();
HandleCallbackParam callback = callBackQueue.take();
if (callback != null) {
for (String address : callback.getLogAddress()) {
try {
// callback
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, address).getObject();
ReturnT<String> callbackResult = adminBiz.callback(callback);
@ -47,7 +46,7 @@ public class TriggerCallbackThread {
}
}).start();
}
public static void pushCallBack(TriggerParam callback){
public static void pushCallBack(HandleCallbackParam callback){
callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}

Loading…
Cancel
Save