任务线程Error异常逻辑逻辑优化

pull/4/head
xuxueli 7 years ago
parent dd2de9d8f7
commit 449691d322

@ -92,69 +92,55 @@ public class JobThread extends Thread{
@Override @Override
public void run() { public void run() {
while(!toStop){ while(!toStop){
running = false; running = false;
idleTimes++; idleTimes++;
// handle job
ReturnT<String> executeResult = null;
TriggerParam triggerParam = null; TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try { try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) { if (triggerParam!=null) {
running = true; running = true;
idleTimes = 0; idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId()); triggerLogIdSet.remove(triggerParam.getLogId());
// parse param // parse param
String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0)
? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(" ")).toArray()) : null; ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null;
try { // log filename: yyyy-MM-dd/9999.log
// log filename: yyyy-MM-dd/9999.log String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams)); XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams));
executeResult = handler.execute(handlerParams);
executeResult = handler.execute(handlerParams); if (executeResult == null) {
if (executeResult == null) { executeResult = ReturnT.FAIL;
executeResult = ReturnT.FAIL; }
} XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); } else {
} catch (Exception e) { if (idleTimes > 30) {
if (toStop) { XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); }
} }
} catch (Throwable e) {
StringWriter stringWriter = new StringWriter(); if (toStop) {
e.printStackTrace(new PrintWriter(stringWriter)); XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
String errorMsg = stringWriter.toString(); }
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
StringWriter stringWriter = new StringWriter();
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); e.printStackTrace(new PrintWriter(stringWriter));
} String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
} else {
if (idleTimes > 30) { XxlJobLogger.log("<br>----------- xxl-job job execute end(error) -----------<br>----------- ReturnT:" + executeResult);
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } finally {
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- xxl-job toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg);
} finally {
if(triggerParam != null) { if(triggerParam != null) {
// callback handler info // callback handler info
if (!toStop) { if (!toStop) {
@ -169,16 +155,16 @@ public class JobThread extends Thread{
} }
} }
// callback trigger request in queue // callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){ while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll(); TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) { if (triggerParam!=null) {
// is killed // is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
} }
} }
logger.info(">>>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
} }
} }

Loading…
Cancel
Save