JobHandler支持自定义回调结果;

v1.6
xueli.xue 8 years ago
parent e287195bf4
commit b7b411ce2a

@ -772,6 +772,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
#### 6.12 版本 V1.6.2 特性Coding #### 6.12 版本 V1.6.2 特性Coding
- 1、任务报表总任务数、总调度数、调度成功比例 - 1、任务报表总任务数、总调度数、调度成功比例
- 2、JobHandler支持自定义回调结果
#### TODO LIST #### TODO LIST
- 1、支持脚本JOB(源码或指定路径), 即shell/python/php等, 日志实时输出并支持在线监控定制JobHandler实现; - 1、支持脚本JOB(源码或指定路径), 即shell/python/php等, 日志实时输出并支持在线监控定制JobHandler实现;
@ -779,7 +780,8 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 3、任务权限管理 - 3、任务权限管理
- 4、执行器server启动注册逻辑调整 - 4、执行器server启动注册逻辑调整
- 5、调度失败重试机制 - 5、调度失败重试机制
- 6、JobHandler支持自定义回调结果
- 7、JobHandler开启多线程时支持记录执行日志
## 七、其他 ## 七、其他

@ -120,12 +120,10 @@ public class JobLogController {
ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum); ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);
// is end // is end
if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) { /*XxlJobLog jobLog = xxlJobLogDao.load(logId);
XxlJobLog jobLog = xxlJobLogDao.load(logId);
if (jobLog.getHandleCode() > 0) { if (jobLog.getHandleCode() > 0) {
logResult.getContent().setEnd(true); logResult.getContent().setEnd(true);
} }*/
}
return logResult; return logResult;
} catch (Exception e) { } catch (Exception e) {

@ -31,7 +31,7 @@ public class AdminBizImpl implements AdminBiz {
// trigger success, to trigger child job, and avoid repeat trigger child job // trigger success, to trigger child job, and avoid repeat trigger child job
String childTriggerMsg = null; String childTriggerMsg = null;
if (ReturnT.SUCCESS_CODE==handleCallbackParam.getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) { if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId()); XxlJobInfo xxlJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId());
if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
childTriggerMsg = "<hr>"; childTriggerMsg = "<hr>";
@ -68,8 +68,8 @@ public class AdminBizImpl implements AdminBiz {
if (log.getHandleMsg()!=null) { if (log.getHandleMsg()!=null) {
handleMsg.append(log.getHandleMsg()).append("<br>"); handleMsg.append(log.getHandleMsg()).append("<br>");
} }
if (handleCallbackParam.getMsg() != null) { if (handleCallbackParam.getExecuteResult().getMsg() != null) {
handleMsg.append("执行备注:").append(handleCallbackParam.getMsg()); handleMsg.append("执行备注:").append(handleCallbackParam.getExecuteResult().getMsg());
} }
if (childTriggerMsg !=null) { if (childTriggerMsg !=null) {
handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg); handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);
@ -77,7 +77,7 @@ public class AdminBizImpl implements AdminBiz {
// success, save log // success, save log
log.setHandleTime(new Date()); log.setHandleTime(new Date());
log.setHandleCode(handleCallbackParam.getCode()); log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
log.setHandleMsg(handleMsg.toString()); log.setHandleMsg(handleMsg.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateHandleInfo(log); XxlJobDynamicScheduler.xxlJobLogDao.updateHandleInfo(log);

@ -172,14 +172,16 @@ package com.xxl.job.service.handler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
public class DemoGlueJobHandler extends IJobHandler { public class DemoGlueJobHandler extends IJobHandler {
private static transient Logger logger = LoggerFactory.getLogger(DemoGlueJobHandler.class); private static transient Logger logger = LoggerFactory.getLogger(DemoGlueJobHandler.class);
@Override @Override
public void execute(String... params) throws Exception { public ReturnT<String> execute(String... params) throws Exception {
logger.info("XXL-JOB, Hello World."); logger.info("XXL-JOB, Hello World.");
return ReturnT.SUCCESS;
} }
} }

@ -8,7 +8,7 @@ $(function() {
} }
// pull log // pull log
var fromLineNum = 0; // [from, to] var fromLineNum = 1; // [from, to], start as 1
var pullFailCount = 0; var pullFailCount = 0;
function pullLog() { function pullLog() {
// pullFailCount, max=20 // pullFailCount, max=20
@ -44,15 +44,17 @@ $(function() {
} }
if (fromLineNum > data.content.toLineNum ) { if (fromLineNum > data.content.toLineNum ) {
console.log('pullLog already line-end'); console.log('pullLog already line-end');
// valid end // valid end
if (data.content.end) { if (data.content.end) {
logRunStop('<span style="color: green;">[Rolling Log Finish]</span>'); logRunStop('<span style="color: green;">[Rolling Log Finish]</span>');
return; return;
} }
return; return;
} }
// append // append content
fromLineNum = data.content.toLineNum + 1; fromLineNum = data.content.toLineNum + 1;
$('#logConsole').append(data.content.logContent); $('#logConsole').append(data.content.logContent);
pullFailCount = 0; pullFailCount = 0;
@ -72,7 +74,7 @@ $(function() {
// handler already callback, end // handler already callback, end
if (handleCode > 0) { if (handleCode > 0) {
logRunStop('<span style="color: green;">[Load Log Finish]</span>'); logRunStop('<br><span style="color: green;">[Load Log Finish]</span>');
return; return;
} }

@ -12,14 +12,12 @@ public class HandleCallbackParam implements Serializable {
private int logId; private int logId;
private Set<String> logAddress; private Set<String> logAddress;
private int code; private ReturnT<String> executeResult;
private String msg;
public HandleCallbackParam(int logId, Set<String> logAddress, int code, String msg) { public HandleCallbackParam(int logId, Set<String> logAddress, ReturnT<String> executeResult) {
this.logId = logId; this.logId = logId;
this.logAddress = logAddress; this.logAddress = logAddress;
this.code = code; this.executeResult = executeResult;
this.msg = msg;
} }
public int getLogId() { public int getLogId() {
@ -38,20 +36,11 @@ public class HandleCallbackParam implements Serializable {
this.logAddress = logAddress; this.logAddress = logAddress;
} }
public int getCode() { public ReturnT<String> getExecuteResult() {
return code; return executeResult;
} }
public void setCode(int code) { public void setExecuteResult(ReturnT<String> executeResult) {
this.code = code; this.executeResult = executeResult;
} }
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
} }

@ -1,5 +1,7 @@
package com.xxl.job.core.handler; package com.xxl.job.core.handler;
import com.xxl.job.core.biz.model.ReturnT;
/** /**
* remote job handler * remote job handler
* @author xuxueli 2015-12-19 19:06:38 * @author xuxueli 2015-12-19 19:06:38
@ -7,11 +9,11 @@ package com.xxl.job.core.handler;
public abstract class IJobHandler { public abstract class IJobHandler {
/** /**
* job handler <br><br> * job handler
* the return Object will be and stored
* @param params * @param params
* @throws Exception default sussecc, fail if catch exception * @return
* @throws Exception
*/ */
public abstract void execute(String... params) throws Exception; public abstract ReturnT<String> execute(String... params) throws Exception;
} }

@ -1,5 +1,6 @@
package com.xxl.job.core.handler.impl; package com.xxl.job.core.handler.impl;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -22,9 +23,10 @@ public class GlueJobHandler extends IJobHandler {
} }
@Override @Override
public void execute(String... params) throws Exception { public ReturnT<String> execute(String... params) throws Exception {
logger.info("----------- glue.version:{} -----------", glueUpdatetime); logger.info("----------- glue.version:{} -----------", glueUpdatetime);
jobHandler.execute(params); jobHandler.execute(params);
return ReturnT.SUCCESS;
} }
} }

@ -122,12 +122,12 @@ public class XxlJobFileAppender extends AppenderSkeleton {
// valid log file // valid log file
if (logFileName==null || logFileName.trim().length()==0) { if (logFileName==null || logFileName.trim().length()==0) {
return new LogResult(fromLineNum, -1, "readLog fail, logFile not found", true); return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
} }
File logFile = new File(filePath, logFileName); File logFile = new File(filePath, logFileName);
if (!logFile.exists()) { if (!logFile.exists()) {
return new LogResult(fromLineNum, -1, "readLog fail, logFile not exists", true); return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);
} }
// read file // read file
@ -139,8 +139,8 @@ public class XxlJobFileAppender extends AppenderSkeleton {
String line = null; String line = null;
while ((line = reader.readLine())!=null) { while ((line = reader.readLine())!=null) {
toLineNum++; toLineNum = reader.getLineNumber(); // [from, to], start as 1
if (reader.getLineNumber() >= fromLineNum) { if (toLineNum >= fromLineNum) {
logContentBuffer.append(line).append("\n"); logContentBuffer.append(line).append("\n");
} }
} }

@ -74,33 +74,36 @@ public class JobThread extends Thread{
? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null;
// handle job // handle job
int _code = ReturnT.SUCCESS_CODE; ReturnT<String> executeResult = null;
String _msg = null;
try { try {
// log filename: yyyy-MM-dd/9999.log // log filename: yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName); XxlJobFileAppender.contextHolder.set(logFileName);
logger.info("----------- xxl-job job execute start -----------"); logger.info("----------- xxl-job job execute start -----------");
handler.execute(handlerParams);
executeResult = handler.execute(handlerParams);
if (executeResult == null) {
executeResult = ReturnT.FAIL;
}
} catch (Exception e) { } catch (Exception e) {
logger.error("JobThread Exception:", e); logger.error("JobThread Exception:", e);
_code = ReturnT.FAIL_CODE;
StringWriter out = new StringWriter(); StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out)); e.printStackTrace(new PrintWriter(out));
_msg = out.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, out.toString());
} }
logger.info("----------- xxl-job job execute end ----------- <br> Look : ExecutorParams:{}, Code:{}, Msg:{}", logger.info("----------- xxl-job job execute end ----------- <br> Look : ExecutorParams:{}, Code:{}, Msg:{}",
new Object[]{handlerParams, _code, _msg}); new Object[]{handlerParams, executeResult.getCode(), executeResult.getMsg()});
// callback handler info // callback handler info
if (!toStop) { if (!toStop) {
// commonm // commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), _code, _msg)); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), executeResult));
} else { } else {
// is killed // is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]")); ReturnT stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult));
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -113,7 +116,8 @@ public class JobThread extends Thread{
TriggerParam triggerParam = triggerQueue.poll(); TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) { if (triggerParam!=null) {
// is killed // is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]")); ReturnT stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult));
} }
} }

@ -1,5 +1,6 @@
package com.xxl.job.executor.service.jobhandler; package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander; import com.xxl.job.core.handler.annotation.JobHander;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -25,13 +26,14 @@ public class DemoJobHandler extends IJobHandler {
private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class); private static transient Logger logger = LoggerFactory.getLogger(DemoJobHandler.class);
@Override @Override
public void execute(String... params) throws Exception { public ReturnT<String> execute(String... params) throws Exception {
logger.info("XXL-JOB, Hello World."); logger.info("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
logger.info("beat at:{}", i); logger.info("beat at:{}", i);
TimeUnit.SECONDS.sleep(2); TimeUnit.SECONDS.sleep(2);
} }
return ReturnT.SUCCESS;
} }
} }

Loading…
Cancel
Save