Merge pull request #219 from xhanthow/local-job

修改JobThread捕获Error错误不更新JobLog的问题
pull/4/head
许雪里 7 years ago committed by GitHub
commit dd2de9d8f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -91,86 +91,94 @@ public class JobThread extends Thread{
@Override @Override
public void run() { public void run() {
while(!toStop){
running = false; while(!toStop){
idleTimes++; running = false;
try { idleTimes++;
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) // handle job
TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); ReturnT<String> executeResult = null;
if (triggerParam!=null) { TriggerParam triggerParam = null;
running = true; try {
idleTimes = 0; // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerLogIdSet.remove(triggerParam.getLogId()); triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
// parse param running = true;
String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) idleTimes = 0;
? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; triggerLogIdSet.remove(triggerParam.getLogId());
// handle job // parse param
ReturnT<String> executeResult = null; String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0)
try { ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(" ")).toArray()) : null;
// log filename: yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
try {
XxlJobFileAppender.contextHolder.set(logFileName); // log filename: yyyy-MM-dd/9999.log
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams));
XxlJobFileAppender.contextHolder.set(logFileName);
executeResult = handler.execute(handlerParams); ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
if (executeResult == null) { XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams));
executeResult = ReturnT.FAIL;
} executeResult = handler.execute(handlerParams);
if (executeResult == null) {
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); executeResult = ReturnT.FAIL;
} catch (Exception e) { }
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} } catch (Exception e) {
if (toStop) {
StringWriter stringWriter = new StringWriter(); XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
e.printStackTrace(new PrintWriter(stringWriter)); }
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); String errorMsg = stringWriter.toString();
} executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
// callback handler info XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
if (!toStop) { }
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult)); } else {
} else { if (idleTimes > 30) {
// is killed XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); }
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); }
} } catch (Throwable e) {
} else { if (toStop) {
if (idleTimes > 30) { XxlJobLogger.log("<br>----------- xxl-job toStop, stopReason:" + stopReason);
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); }
}
} StringWriter stringWriter = new StringWriter();
} catch (Throwable e) { e.printStackTrace(new PrintWriter(stringWriter));
if (toStop) { String errorMsg = stringWriter.toString();
XxlJobLogger.log("<br>----------- xxl-job toStop, stopReason:" + stopReason); executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
}
XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg);
StringWriter stringWriter = new StringWriter(); } finally {
e.printStackTrace(new PrintWriter(stringWriter)); if(triggerParam != null) {
String errorMsg = stringWriter.toString(); // callback handler info
XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg); if (!toStop) {
} // commonm
} TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult));
} else {
// callback trigger request in queue // is killed
while(triggerQueue !=null && triggerQueue.size()>0){ ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
TriggerParam triggerParam = triggerQueue.poll(); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
if (triggerParam!=null) { }
// is killed }
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); }
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); }
}
} // callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); TriggerParam triggerParam = triggerQueue.poll();
} if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}
logger.info(">>>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
} }

Loading…
Cancel
Save