From abb4474fbb700d78e4bdcc9ebc93f76fb3d44aee Mon Sep 17 00:00:00 2001 From: anthow Date: Fri, 10 Nov 2017 11:43:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9JobThread=E6=8D=95=E8=8E=B7Er?= =?UTF-8?q?ror=E9=94=99=E8=AF=AF=E4=B8=8D=E6=9B=B4=E6=96=B0JobLog?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/xxl/job/core/thread/JobThread.java | 172 +++++++++--------- 1 file changed, 90 insertions(+), 82 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index ae385f3f..3114d933 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -91,86 +91,94 @@ public class JobThread extends Thread{ @Override public void run() { - while(!toStop){ - running = false; - idleTimes++; - try { - // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) - TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); - if (triggerParam!=null) { - running = true; - idleTimes = 0; - triggerLogIdSet.remove(triggerParam.getLogId()); - - // parse param - String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) - ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; - - // handle job - ReturnT executeResult = null; - try { - // log filename: yyyy-MM-dd/9999.log - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); - - XxlJobFileAppender.contextHolder.set(logFileName); - ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); - XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); - - executeResult = handler.execute(handlerParams); - if (executeResult == null) { - executeResult = ReturnT.FAIL; - } - - XxlJobLogger.log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + executeResult); - } catch (Exception e) { - if (toStop) { - XxlJobLogger.log("
----------- JobThread toStop, stopReason:" + stopReason); - } - - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); - - XxlJobLogger.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); - } - - // callback handler info - if (!toStop) { - // commonm - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult)); - } else { - // is killed - ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); - } - } else { - if (idleTimes > 30) { - XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); - } - } - } catch (Throwable e) { - if (toStop) { - XxlJobLogger.log("
----------- xxl-job toStop, stopReason:" + stopReason); - } - - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg); - } - } - - // callback trigger request in queue - while(triggerQueue !=null && triggerQueue.size()>0){ - TriggerParam triggerParam = triggerQueue.poll(); - if (triggerParam!=null) { - // is killed - ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); - } - } - - logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); - } + + while(!toStop){ + running = false; + idleTimes++; + // handle job + ReturnT executeResult = null; + TriggerParam triggerParam = null; + try { + // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerParam!=null) { + running = true; + idleTimes = 0; + triggerLogIdSet.remove(triggerParam.getLogId()); + + // parse param + String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) + ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(" ")).toArray()) : null; + + + try { + // log filename: yyyy-MM-dd/9999.log + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); + + XxlJobFileAppender.contextHolder.set(logFileName); + ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); + XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); + + executeResult = handler.execute(handlerParams); + if (executeResult == null) { + executeResult = ReturnT.FAIL; + } + + XxlJobLogger.log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + executeResult); + } catch (Exception e) { + if (toStop) { + XxlJobLogger.log("
----------- JobThread toStop, stopReason:" + stopReason); + } + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); + + XxlJobLogger.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); + } + + } else { + if (idleTimes > 30) { + XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); + } + } + } catch (Throwable e) { + if (toStop) { + XxlJobLogger.log("
----------- xxl-job toStop, stopReason:" + stopReason); + } + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); + + XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg); + } finally { + if(triggerParam != null) { + // callback handler info + if (!toStop) { + // commonm + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult)); + } else { + // is killed + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); + } + } + } + } + + // callback trigger request in queue + while(triggerQueue !=null && triggerQueue.size()>0){ + TriggerParam triggerParam = triggerQueue.poll(); + if (triggerParam!=null) { + // is killed + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); + } + } + + logger.info(">>>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); + } }