From 7dc7c1f2d020e16fb8627f07d72203385a13520d Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Tue, 26 Jun 2018 21:51:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E6=8C=81=E4=B9=85=E5=8C=96=EF=BC=9A=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=99=A8=E5=9B=9E=E8=B0=83=E5=A4=B1=E8=B4=A5=E6=97=B6?= =?UTF-8?q?=E5=B0=86=E4=BB=BB=E5=8A=A1=E7=BB=93=E6=9E=9C=E5=86=99=E7=A3=81?= =?UTF-8?q?=E7=9B=98=EF=BC=8C=E5=BE=85=E9=87=8D=E5=90=AF=E6=88=96=E7=BD=91?= =?UTF-8?q?=E7=BB=9C=E6=81=A2=E5=A4=8D=E6=97=B6=E9=87=8D=E8=AF=95=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E4=BB=BB=E5=8A=A1=E7=BB=93=E6=9E=9C=EF=BC=8C=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E4=B8=A2=E5=A4=B1=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 41 ++++---- .../core/thread/TriggerCallbackThread.java | 92 ++++++++++++++++- .../java/com/xxl/job/core/util/FileUtil.java | 99 ++++++++++++++++++- .../com/xxl/job/core/util/JacksonUtil.java | 30 +++++- 4 files changed, 235 insertions(+), 27 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index fd9af56b..ec9258a0 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1230,33 +1230,32 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 14、脚本任务Log文件流关闭优化; - 15、任务报表成功、失败和进行中统计问题修复; - 16、自研Log组件参数占位符改为"{}",并修复打印有参日志时参数不匹配导致报错的问题; +- 17、执行器任务结果持久化:执行器回调失败时将任务结果写磁盘,待重启或网络恢复时重试回调任务结果,防止任务执行结果丢失; ### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; - 2、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定; - 3、任务单机多线程:提升任务单机并行处理能力; -- 4、回调失败丢包问题:执行器回调失败写文件,重启或周期性回调重试;调度中心周期性请求并同步未回调的执行结果; -- 5、任务依赖,流程图,子任务+会签任务,各节点日志; -- 6、调度任务优先级; -- 7、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。 -- 8、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用; -- 9、多数据库支持; -- 10、执行器Log清理功能:调度中心Log删除时同步删除执行器中的Log文件; -- 11、Bean模式任务,JobHandler自动从执行器中查询展示为下拉框,选择后自动填充任务名称等属性; -- 12、API事件触发类型任务(更类似MQ消息)支持"动态传参、延时消费";该类型任务不走Quartz,单独建立MQ消息表,调度中心竞争触发; -- 13、任务依赖增强,新增任务类型 "流程任务",流程节点可挂载普通类型任务,承担任务依赖功能。现有子任务模型取消;需要考虑任务依赖死循环问题; -- 14、分片任务某一分片失败,支持分片转移; -- 15、调度中心触发任务后,先推送触发队列,异步触发,然后立即返回。降低quartz线程占用时长。 -- 16、任务告警逻辑调整:任务调度,以及任务回调失败时,均推送监控队列。后期考虑通过任务Log字段控制告警状态; -- 17、新增任务默认运行状态,任务更新时运行状态保持不变; -- 18、提供多版本执行器:不依赖容器版本、不内嵌Jetty版本(通过配置executoraddress替换jetty通讯)等; -- 19、注册中心支持扩展,除默认基于DB之外,支持扩展接入第三方注册中心如zk、eureka等; -- 20、依赖Core内部国际化处理; -- 21、流程任务,支持参数传递; -- 22、SimpleTrigger 支持; -- 23、springboot热部署支持; -- 24、支持通过API服务操作任务信息; +- 4、任务依赖,流程图,子任务+会签任务,各节点日志; +- 5、调度任务优先级; +- 6、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。 +- 7、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用; +- 8、多数据库支持; +- 9、执行器Log清理功能:调度中心Log删除时同步删除执行器中的Log文件; +- 10、Bean模式任务,JobHandler自动从执行器中查询展示为下拉框,选择后自动填充任务名称等属性; +- 11、API事件触发类型任务(更类似MQ消息)支持"动态传参、延时消费";该类型任务不走Quartz,单独建立MQ消息表,调度中心竞争触发;待定,该功能与 XXL-MQ 冲突,该场景建议用后者; +- 12、任务依赖增强,新增任务类型 "流程任务",流程节点可挂载普通类型任务,承担任务依赖功能。现有子任务模型取消;需要考虑任务依赖死循环问题; +- 13、分片任务某一分片失败,支持分片转移; +- 14、调度中心触发任务后,先推送触发队列,异步触发,然后立即返回。降低quartz线程占用时长。 +- 15、任务告警逻辑调整:任务调度,以及任务回调失败时,均推送监控队列。后期考虑通过任务Log字段控制告警状态; +- 16、新增任务默认运行状态,任务更新时运行状态保持不变; +- 17、提供多版本执行器:不依赖容器版本、不内嵌Jetty版本(通过配置executoraddress替换jetty通讯)等; +- 18、注册中心支持扩展,除默认基于DB之外,支持扩展接入第三方注册中心如zk、eureka等; +- 19、依赖Core内部国际化处理; +- 20、流程任务,支持参数传递; +- 21、SimpleTrigger 支持; +- 22、支持通过API服务操作任务信息; ## 七、其他 diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 97c3ae4d..491b9aab 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -3,16 +3,21 @@ package com.xxl.job.core.thread; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.FileUtil; +import com.xxl.job.core.util.JacksonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * Created by xuxueli on 16/7/22. @@ -38,6 +43,7 @@ public class TriggerCallbackThread { * callback thread */ private Thread triggerCallbackThread; + private Thread triggerRetryCallbackThread; private volatile boolean toStop = false; public void start() { @@ -47,6 +53,7 @@ public class TriggerCallbackThread { return; } + // callback triggerCallbackThread = new Thread(new Runnable() { @Override @@ -89,16 +96,48 @@ public class TriggerCallbackThread { }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.start(); + + + // retry + triggerRetryCallbackThread = new Thread(new Runnable() { + @Override + public void run() { + while(!toStop){ + try { + retryFailCallbackFile(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + try { + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); + } + }); + triggerRetryCallbackThread.setDaemon(true); + triggerRetryCallbackThread.start(); + } public void toStop(){ toStop = true; - // interrupt and wait + // stop callback, interrupt and wait triggerCallbackThread.interrupt(); try { triggerCallbackThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } + + // stop retry, interrupt and wait + triggerRetryCallbackThread.interrupt(); + try { + triggerRetryCallbackThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } } /** @@ -106,21 +145,25 @@ public class TriggerCallbackThread { * @param callbackParamList */ private void doCallback(List callbackParamList){ + boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "
----------- xxl-job callback success"); + callbackRet = true; break; } else { callbackLog(callbackParamList, "
----------- xxl-job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "
----------- xxl-job callback error, errorMsg:" + e.getMessage()); - //getInstance().callBackQueue.addAll(callbackParamList); } } + if (!callbackRet) { + appendFailCallbackFile(callbackParamList); + } } /** @@ -134,4 +177,49 @@ public class TriggerCallbackThread { } } + + // ---------------------- fial-callback file TODO ---------------------- + + private static String failCallbackFileName = XxlJobFileAppender.getLogPath().concat(File.separator).concat("xxl-job-callback").concat(".log"); + + private void appendFailCallbackFile(List callbackParamList){ + // append file + String content = JacksonUtil.writeValueAsString(callbackParamList); + FileUtil.appendFileLine(failCallbackFileName, content); + } + + private void retryFailCallbackFile(){ + + // load and clear file + List fileLines = FileUtil.loadFileLines(failCallbackFileName); + FileUtil.deleteFile(failCallbackFileName); + + // parse + List failCallbackParamList = new ArrayList<>(); + if (fileLines!=null && fileLines.size()>0) { + for (String line: fileLines) { + List failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class); + if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) { + failCallbackParamList.addAll(failCallbackParamListTmp); + } + } + } + + // retry callback, 100 lines per page + if (failCallbackParamList!=null && failCallbackParamList.size()>0) { + int pagesize = 100; + List pageData = new ArrayList<>(); + for (int i = 0; i < failCallbackParamList.size(); i++) { + pageData.add(failCallbackParamList.get(i)); + if (i>0 && i%pagesize == 0) { + doCallback(pageData); + pageData.clear(); + } + } + if (pageData.size() > 0) { + doCallback(pageData); + } + } + } + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java index e68cdcde..6a4abee0 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java @@ -1,6 +1,11 @@ package com.xxl.job.core.util; -import java.io.File; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; /** * file tool @@ -8,7 +13,14 @@ import java.io.File; * @author xuxueli 2017-12-29 17:56:48 */ public class FileUtil { + private static Logger logger = LoggerFactory.getLogger(FileUtil.class); + /** + * delete recursively + * + * @param root + * @return + */ public static boolean deleteRecursively(File root) { if (root != null && root.exists()) { if (root.isDirectory()) { @@ -24,4 +36,89 @@ public class FileUtil { return false; } + public static void deleteFile(String fileName) { + // file + File file = new File(fileName); + if (file.exists()) { + file.delete(); + } + } + + public static void appendFileLine(String fileName, String content) { + + // file + File file = new File(fileName); + if (!file.exists()) { + try { + file.createNewFile(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + return; + } + } + + // content + if (content == null) { + content = ""; + } + content += "\r\n"; + + // append file content + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file, true); + fos.write(content.getBytes("utf-8")); + fos.flush(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + } + + public static List loadFileLines(String fileName){ + + List result = new ArrayList<>(); + + // valid log file + File file = new File(fileName); + if (!file.exists()) { + return result; + } + + // read file + StringBuffer logContentBuffer = new StringBuffer(); + int toLineNum = 0; + LineNumberReader reader = null; + try { + //reader = new LineNumberReader(new FileReader(logFile)); + reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), "utf-8")); + String line = null; + while ((line = reader.readLine())!=null) { + if (line!=null && line.trim().length()>0) { + result.add(line); + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + return result; + } + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java index a679fb04..e24d865d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java @@ -2,7 +2,7 @@ package com.xxl.job.core.util; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; @@ -68,7 +68,31 @@ public class JacksonUtil { } return null; } - public static T readValueRefer(String jsonStr, Class clazz) { + + /** + * string --> List... + * + * @param jsonStr + * @param parametrized + * @param parameterClasses + * @param + * @return + */ + public static T readValue(String jsonStr, Class parametrized, Class... parameterClasses) { + try { + JavaType javaType = getInstance().getTypeFactory().constructParametricType(parametrized, parameterClasses); + return getInstance().readValue(jsonStr, javaType); + } catch (JsonParseException e) { + logger.error(e.getMessage(), e); + } catch (JsonMappingException e) { + logger.error(e.getMessage(), e); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return null; + } + + /*public static T readValueRefer(String jsonStr, Class clazz) { try { return getInstance().readValue(jsonStr, new TypeReference() { }); } catch (JsonParseException e) { @@ -79,7 +103,7 @@ public class JacksonUtil { logger.error(e.getMessage(), e); } return null; - } + }*/ public static void main(String[] args) { try {