From 449691d32216bfb44b4915145d40f95a668f1a5c Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Fri, 10 Nov 2017 16:23:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=BA=BF=E7=A8=8BError?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E9=80=BB=E8=BE=91=E9=80=BB=E8=BE=91=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/xxl/job/core/thread/JobThread.java | 130 ++++++++---------- 1 file changed, 58 insertions(+), 72 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index 3114d933..20a3ad8e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -92,69 +92,55 @@ public class JobThread extends Thread{ @Override public void run() { - while(!toStop){ - running = false; - idleTimes++; - // handle job - ReturnT executeResult = null; + while(!toStop){ + running = false; + idleTimes++; + TriggerParam triggerParam = null; + ReturnT executeResult = null; try { - // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) - triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); - if (triggerParam!=null) { - running = true; - idleTimes = 0; - triggerLogIdSet.remove(triggerParam.getLogId()); - - // parse param - String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) - ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(" ")).toArray()) : null; - - - try { - // log filename: yyyy-MM-dd/9999.log - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); - - XxlJobFileAppender.contextHolder.set(logFileName); - ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); - XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); - - executeResult = handler.execute(handlerParams); - if (executeResult == null) { - executeResult = ReturnT.FAIL; - } - - XxlJobLogger.log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + executeResult); - } catch (Exception e) { - if (toStop) { - XxlJobLogger.log("
----------- JobThread toStop, stopReason:" + stopReason); - } - - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); - - XxlJobLogger.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); - } - - } else { - if (idleTimes > 30) { - XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); - } - } - } catch (Throwable e) { - if (toStop) { - XxlJobLogger.log("
----------- xxl-job toStop, stopReason:" + stopReason); - } - - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); - - XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg); - } finally { + // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerParam!=null) { + running = true; + idleTimes = 0; + triggerLogIdSet.remove(triggerParam.getLogId()); + + // parse param + String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) + ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; + + + // log filename: yyyy-MM-dd/9999.log + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); + XxlJobFileAppender.contextHolder.set(logFileName); + ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); + + // execute + XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); + executeResult = handler.execute(handlerParams); + if (executeResult == null) { + executeResult = ReturnT.FAIL; + } + XxlJobLogger.log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + executeResult); + + } else { + if (idleTimes > 30) { + XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); + } + } + } catch (Throwable e) { + if (toStop) { + XxlJobLogger.log("
----------- JobThread toStop, stopReason:" + stopReason); + } + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); + + XxlJobLogger.log("
----------- xxl-job job execute end(error) -----------
----------- ReturnT:" + executeResult); + } finally { if(triggerParam != null) { // callback handler info if (!toStop) { @@ -169,16 +155,16 @@ public class JobThread extends Thread{ } } - // callback trigger request in queue - while(triggerQueue !=null && triggerQueue.size()>0){ - TriggerParam triggerParam = triggerQueue.poll(); - if (triggerParam!=null) { - // is killed - ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); - } - } + // callback trigger request in queue + while(triggerQueue !=null && triggerQueue.size()>0){ + TriggerParam triggerParam = triggerQueue.poll(); + if (triggerParam!=null) { + // is killed + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); + } + } - logger.info(">>>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); - } + logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); + } }