任务回调失败日志读写磁盘逻辑优化,解决极端情况下大文件读写内存问题;

pull/72/head
xuxueli 6 months ago
parent 8567a4c93f
commit a07001644b

@ -2574,7 +2574,7 @@ public void execute() {
### 7.41 版本 v3.2.1 Release Notes[规划中]
- 1、【新增】执行器新增“任务扫描排除路径”配置项(xxl.job.executor.excludedpackage)任务扫描时忽略指定包路径下的Bean支持配置多个包路径、逗号分隔
- 2、【优化】执行器任务Bean扫描逻辑优化完善懒加载Bean检测及过滤机制
- 3、【优化】调度不重不漏逻辑优化:调度时间轮单刻度数据去重,避免极端情况下任务重复执行;时间轮转动时校验临近刻度,避免极端情况下遗漏刻度;
- 3、【优化】调度时间轮强化,保障不重不漏:调度时间轮单刻度数据去重,避免极端情况下任务重复执行;时间轮转动时校验临近刻度,避免极端情况下遗漏刻度;
- 4、【优化】任务调度中心调度锁逻辑优化事务SQL下沉至Mapper层统一管理并增加测试用例提升代码可读性以及可维护性
- 5、【优化】报表统计SQL优化修复小概率情况下查询null值问题
- 6、【重构】调度过期策略、调度类型策略逻辑重构代码组件化拆分并完善日志提升健壮性及可维护性
@ -2584,10 +2584,11 @@ public void execute() {
- 10、【修复】合并PR-2369修复脚本任务参数取值问题
- 11、【优化】调度组件日志完善提升边界情况下问题定位效率
- 12、【升级】升级多项maven依赖至较新版本如 netty、groovy、spring、spring-ai、dify 等;
- 13、【ING】UI框架重构升级提升交互体验
- 14、【ING】调整资源加载逻辑移除不必要的拦截器逻辑提升页面加载效率
- 15、【ING】规范API交互协议通用响应结构体调整为Response
- 16、【ING】Http通讯组件升级基于接口代理方式重构
- 14、【优化】任务回调失败日志读写磁盘逻辑优化解决极端情况下大文件读写内存问题
- 15、【ING】UI框架重构升级提升交互体验
- 16、【ING】调整资源加载逻辑移除不必要的拦截器逻辑提升页面加载效率
- 17、【ING】规范API交互协议通用响应结构体调整为Response
- 18、【ING】Http通讯组件升级基于接口代理方式重构
### TODO LIST

@ -3,10 +3,10 @@ package com.xxl.job.admin.scheduler.trigger;
import com.xxl.job.admin.mapper.XxlJobGroupMapper;
import com.xxl.job.admin.mapper.XxlJobInfoMapper;
import com.xxl.job.admin.mapper.XxlJobLogMapper;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.model.XxlJobGroup;
import com.xxl.job.admin.model.XxlJobInfo;
import com.xxl.job.admin.model.XxlJobLog;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.job.admin.scheduler.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
@ -14,8 +14,8 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.exception.ThrowableTool;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -235,7 +235,7 @@ public class JobTrigger {
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = ReturnT.ofFail(ThrowableUtil.toString(e));
runResult = ReturnT.ofFail(ThrowableTool.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + "");

@ -17,7 +17,7 @@ public class ExecutorBizTest {
// admin-client
private static String addressUrl = "http://127.0.0.1:9999/";
private static String accessToken = null;
private static String accessToken = "default_token";
private static int timeout = 3;
@Test
@ -70,6 +70,7 @@ public class ExecutorBizTest {
// Assert result
Assertions.assertNotNull(retval);
}
@Test

@ -23,7 +23,7 @@ public class XxlJobHelper {
/**
* current JobId
*
* @return
* @return jobId
*/
public static long getJobId() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
@ -37,7 +37,7 @@ public class XxlJobHelper {
/**
* current JobParam
*
* @return
* @return jobParam
*/
public static String getJobParam() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
@ -53,7 +53,7 @@ public class XxlJobHelper {
/**
* current JobLogFileName
*
* @return
* @return logFileName
*/
public static String getJobLogFileName() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
@ -69,7 +69,7 @@ public class XxlJobHelper {
/**
* current ShardIndex
*
* @return
* @return shardIndex
*/
public static int getShardIndex() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
@ -83,7 +83,7 @@ public class XxlJobHelper {
/**
* current ShardTotal
*
* @return
* @return shardTotal
*/
public static int getShardTotal() {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
@ -96,7 +96,7 @@ public class XxlJobHelper {
// ---------------------- tool for log ----------------------
private static Logger logger = LoggerFactory.getLogger("xxl-job logger");
private static final Logger logger = LoggerFactory.getLogger("xxl-job logger");
/**
* append log with pattern
@ -121,7 +121,8 @@ public class XxlJobHelper {
/**
* append exception stack
*
* @param e
* @param e exception to log
* return true if log success
*/
public static boolean log(Throwable e) {
@ -136,8 +137,8 @@ public class XxlJobHelper {
/**
* append log
*
* @param callInfo
* @param appendLog
* @param callInfo call info
* @param appendLog append log
*/
private static boolean logDetail(StackTraceElement callInfo, String appendLog) {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
@ -174,7 +175,7 @@ public class XxlJobHelper {
/**
* handle success
*
* @return
* @return true if handle success
*/
public static boolean handleSuccess(){
return handleResult(XxlJobContext.HANDLE_CODE_SUCCESS, null);
@ -183,8 +184,8 @@ public class XxlJobHelper {
/**
* handle success with log msg
*
* @param handleMsg
* @return
* @param handleMsg log msg
* @return true if handle success
*/
public static boolean handleSuccess(String handleMsg) {
return handleResult(XxlJobContext.HANDLE_CODE_SUCCESS, handleMsg);
@ -193,7 +194,7 @@ public class XxlJobHelper {
/**
* handle fail
*
* @return
* @return true if handle fail
*/
public static boolean handleFail(){
return handleResult(XxlJobContext.HANDLE_CODE_FAIL, null);
@ -202,8 +203,8 @@ public class XxlJobHelper {
/**
* handle fail with log msg
*
* @param handleMsg
* @return
* @param handleMsg log msg
* @return true if handle fail
*/
public static boolean handleFail(String handleMsg) {
return handleResult(XxlJobContext.HANDLE_CODE_FAIL, handleMsg);
@ -212,7 +213,7 @@ public class XxlJobHelper {
/**
* handle timeout
*
* @return
* @return true if handle timeout
*/
public static boolean handleTimeout(){
return handleResult(XxlJobContext.HANDLE_CODE_TIMEOUT, null);
@ -221,8 +222,8 @@ public class XxlJobHelper {
/**
* handle timeout with log msg
*
* @param handleMsg
* @return
* @param handleMsg log msg
* @return true if handle timeout
*/
public static boolean handleTimeout(String handleMsg){
return handleResult(XxlJobContext.HANDLE_CODE_TIMEOUT, handleMsg);
@ -235,8 +236,8 @@ public class XxlJobHelper {
* 500 : fail
* 502 : timeout
*
* @param handleMsg
* @return
* @param handleMsg log msg
* @return true if handle success
*/
public static boolean handleResult(int handleCode, String handleMsg) {
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();

@ -0,0 +1,181 @@
//package com.xxl.job.core.util;
//
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import java.io.File;
//import java.io.FileInputStream;
//import java.io.FileOutputStream;
//import java.io.IOException;
//
///**
// * file tool
// *
// * @author xuxueli 2017-12-29 17:56:48
// */
//public class FileUtil {
// private static Logger logger = LoggerFactory.getLogger(FileUtil.class);
//
//
// /**
// * delete recursively
// *
// * @param root
// * @return
// */
// public static boolean deleteRecursively(File root) {
// if (root != null && root.exists()) {
// if (root.isDirectory()) {
// File[] children = root.listFiles();
// if (children != null) {
// for (File child : children) {
// deleteRecursively(child);
// }
// }
// }
// return root.delete();
// }
// return false;
// }
//
//
// public static void deleteFile(String fileName) {
// // file
// File file = new File(fileName);
// if (file.exists()) {
// file.delete();
// }
// }
//
//
// public static void writeFileContent(File file, byte[] data) {
//
// // file
// if (!file.exists()) {
// file.getParentFile().mkdirs();
// }
//
// // append file content
// FileOutputStream fos = null;
// try {
// fos = new FileOutputStream(file);
// fos.write(data);
// fos.flush();
// } catch (Exception e) {
// logger.error(e.getMessage(), e);
// } finally {
// if (fos != null) {
// try {
// fos.close();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// }
// }
// }
//
// }
//
// public static byte[] readFileContent(File file) {
// Long fileLength = file.length();
// byte[] fileContent = new byte[fileLength.intValue()];
//
// FileInputStream in = null;
// try {
// in = new FileInputStream(file);
// in.read(fileContent);
// in.close();
//
// return fileContent;
// } catch (Exception e) {
// logger.error(e.getMessage(), e);
// return null;
// } finally {
// if (in != null) {
// try {
// in.close();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// }
// }
// }
// }
//
//
// /*public static void appendFileLine(String fileName, String content) {
//
// // file
// File file = new File(fileName);
// if (!file.exists()) {
// try {
// file.createNewFile();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// return;
// }
// }
//
// // content
// if (content == null) {
// content = "";
// }
// content += "\r\n";
//
// // append file content
// FileOutputStream fos = null;
// try {
// fos = new FileOutputStream(file, true);
// fos.write(content.getBytes("utf-8"));
// fos.flush();
// } catch (Exception e) {
// logger.error(e.getMessage(), e);
// } finally {
// if (fos != null) {
// try {
// fos.close();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// }
// }
// }
//
// }
//
// public static List<String> loadFileLines(String fileName){
//
// List<String> result = new ArrayList<>();
//
// // valid log file
// File file = new File(fileName);
// if (!file.exists()) {
// return result;
// }
//
// // read file
// StringBuffer logContentBuffer = new StringBuffer();
// int toLineNum = 0;
// LineNumberReader reader = null;
// try {
// //reader = new LineNumberReader(new FileReader(logFile));
// reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
// String line = null;
// while ((line = reader.readLine())!=null) {
// if (line!=null && line.trim().length()>0) {
// result.add(line);
// }
// }
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// } finally {
// if (reader != null) {
// try {
// reader.close();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// }
// }
// }
//
// return result;
// }*/
//
//}

@ -0,0 +1,75 @@
//package com.xxl.job.core.util;
//
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import java.io.*;
//
///**
// * @author xuxueli 2020-04-12 0:14:00
// */
//public class JdkSerializeTool {
// private static Logger logger = LoggerFactory.getLogger(JdkSerializeTool.class);
//
//
// // ------------------------ serialize and unserialize ------------------------
//
// /**
// * 将对象-->byte[] (由于jedis中不支持直接存储object所以转换成byte[]存入)
// *
// * @param object
// * @return
// */
// public static byte[] serialize(Object object) {
// ObjectOutputStream oos = null;
// ByteArrayOutputStream baos = null;
// try {
// // 序列化
// baos = new ByteArrayOutputStream();
// oos = new ObjectOutputStream(baos);
// oos.writeObject(object);
// byte[] bytes = baos.toByteArray();
// return bytes;
// } catch (Exception e) {
// logger.error(e.getMessage(), e);
// } finally {
// try {
// oos.close();
// baos.close();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// }
// }
// return null;
// }
//
//
// /**
// * 将byte[] -->Object
// *
// * @param bytes
// * @return
// */
// public static <T> Object deserialize(byte[] bytes, Class<T> clazz) {
// ObjectInputStream ois = null;
// ByteArrayInputStream bais = null;
// try {
// // 反序列化
// bais = new ByteArrayInputStream(bytes);
// ois = new ObjectInputStream(bais);
// return ois.readObject();
// } catch (Exception e) {
// logger.error(e.getMessage(), e);
// } finally {
// try {
// ois.close();
// bais.close();
// } catch (IOException e) {
// logger.error(e.getMessage(), e);
// }
// }
// return null;
// }
//
//
//}

@ -0,0 +1,24 @@
//package com.xxl.job.core.util;
//
//import java.io.PrintWriter;
//import java.io.StringWriter;
//
///**
// * @author xuxueli 2018-10-20 20:07:26
// */
//public class ThrowableUtil {
//
// /**
// * parse error to string
// *
// * @param e
// * @return
// */
// public static String toString(Throwable e) {
// StringWriter stringWriter = new StringWriter();
// e.printStackTrace(new PrintWriter(stringWriter));
// String errorMsg = stringWriter.toString();
// return errorMsg;
// }
//
//}

@ -5,9 +5,19 @@ package com.xxl.job.core.enums;
*/
public class RegistryConfig {
/**
* registry beat interval, default 30s
*/
public static final int BEAT_TIMEOUT = 30;
/**
* registry dead timeout, default 90s
*/
public static final int DEAD_TIMEOUT = BEAT_TIMEOUT * 3;
/**
* registry type
*/
public enum RegistType{ EXECUTOR, ADMIN }
}

@ -1,6 +1,7 @@
package com.xxl.job.core.log;
import com.xxl.job.core.biz.model.LogResult;
import com.xxl.tool.core.StringTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,27 +15,26 @@ import java.util.Date;
* @author xuxueli 2016-3-12 19:25:12
*/
public class XxlJobFileAppender {
private static Logger logger = LoggerFactory.getLogger(XxlJobFileAppender.class);
private static final Logger logger = LoggerFactory.getLogger(XxlJobFileAppender.class);
/**
* log base path
*
* strut like:
* ---/
* ---/gluesource/
* ---/gluesource/10_1514171108000.js
* ---/gluesource/10_1514171108000.js
* ---/2017-12-25/
* ---/callbacklogs/xxl-job-callback-1761412677119.log
* ---/2017-12-25/639.log
* ---/2017-12-25/821.log
*
*/
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
private static String glueSrcPath = logBasePath.concat(File.separator).concat("gluesource");
private static String callbackLogPath = logBasePath.concat(File.separator).concat("callbacklogs");
public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {
logBasePath = logPath;
if (StringTool.isNotBlank(logPath)) {
logBasePath = logPath.trim();
}
// mk base dir
File logPathDir = new File(logBasePath);
@ -56,6 +56,9 @@ public class XxlJobFileAppender {
public static String getGlueSrcPath() {
return glueSrcPath;
}
public static String getCallbackLogPath() {
return callbackLogPath;
}
/**
* log filename, like "logPath/yyyy-MM-dd/9999.log"

@ -5,8 +5,8 @@ import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import com.xxl.tool.exception.ThrowableTool;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
@ -202,7 +202,7 @@ public class EmbedServer {
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
return ReturnT.ofFail("request error:" + ThrowableUtil.toString(e));
return ReturnT.ofFail("request error:" + ThrowableTool.toString(e));
}
}

@ -1,13 +1,12 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.tool.core.DateTool;
import com.xxl.tool.io.FileTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@ -31,7 +30,7 @@ public class JobLogFileCleanThread {
// limit min value
if (logRetentionDays < 3 ) {
return;
return; // effective only when logRetentionDays >= 3
}
localThread = new Thread(new Runnable() {
@ -52,32 +51,37 @@ public class JobLogFileCleanThread {
Date todayDate = todayCal.getTime();
// clean expired logfile
for (File childFile: childDirs) {
// valid
// valid log-path: must be directory
if (!childFile.isDirectory()) {
continue;
}
if (childFile.getName().indexOf("-") == -1) {
// valid day log-path: like "---/2017-12-25/639.log"
if (!childFile.getName().contains("-")) {
continue;
}
// file create date
// parse create-day of file-path
Date logFileCreateDate = null;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logFileCreateDate = DateTool.parseDate(childFile.getName());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
FileUtil.deleteRecursively(childFile);
// check expired
Date expiredDate = DateTool.addDays(logFileCreateDate, logRetentionDays);
if (todayDate.getTime() > expiredDate.getTime()) {
// expired, remove all log of this day
FileTool.delete(childFile);
//FileUtil.deleteRecursively(childFile);
}
}
}
@ -85,7 +89,6 @@ public class JobLogFileCleanThread {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {

@ -8,12 +8,17 @@ import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.job.core.util.JdkSerializeTool;
import com.xxl.job.core.util.GsonTool;
import com.xxl.tool.core.ArrayTool;
import com.xxl.tool.core.CollectionTool;
import com.xxl.tool.core.StringTool;
import com.xxl.tool.encrypt.Md5Tool;
import com.xxl.tool.io.FileTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -21,12 +26,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Trigger Callback Thread
*
* Created by xuxueli on 16/7/22.
*/
public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static final Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static TriggerCallbackThread instance = new TriggerCallbackThread();
private static final TriggerCallbackThread instance = new TriggerCallbackThread();
public static TriggerCallbackThread getInstance(){
return instance;
}
@ -34,7 +41,7 @@ public class TriggerCallbackThread {
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
private final LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<>();
public static void pushCallBack(HandleCallbackParam callback){
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
@ -54,7 +61,9 @@ public class TriggerCallbackThread {
return;
}
// callback
/**
* trigger callback thread
*/
triggerCallbackThread = new Thread(new Runnable() {
@Override
@ -66,13 +75,13 @@ public class TriggerCallbackThread {
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// collect callback data
List<HandleCallbackParam> callbackParamList = new ArrayList<>();
callbackParamList.add(callback); // add one element
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); // drainTo other all elements
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
// do callback, will retry if error
if (CollectionTool.isNotEmpty(callbackParamList)) {
doCallback(callbackParamList);
}
}
@ -83,11 +92,14 @@ public class TriggerCallbackThread {
}
}
// last callback
// thead stop, callback lasttime
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
// collect callback data
List<HandleCallbackParam> callbackParamList = new ArrayList<>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
// do callback
if (CollectionTool.isNotEmpty(callbackParamList)) {
doCallback(callbackParamList);
}
} catch (Throwable e) {
@ -104,7 +116,9 @@ public class TriggerCallbackThread {
triggerCallbackThread.start();
// retry
/**
* callback fail retry thread
*/
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
@ -129,6 +143,7 @@ public class TriggerCallbackThread {
}
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.setName("xxl-job, executor TriggerRetryCallbackThread");
triggerRetryCallbackThread.start();
}
@ -158,7 +173,8 @@ public class TriggerCallbackThread {
/**
* do callback, will retry if error
* @param callbackParamList
*
* @param callbackParamList callback param list
*/
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
@ -201,58 +217,81 @@ public class TriggerCallbackThread {
// ---------------------- fail-callback file ----------------------
private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);
private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");
/**
* fail-callback file name
*/
private static final String failCallbackFileName = XxlJobFileAppender
.getCallbackLogPath()
.concat(File.separator)
.concat("xxl-job-callback-{x}")
.concat(".log");
private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
/**
* append fail-callback file
*
* @param callbackParamList callback param list
*/
private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList) {
// valid
if (callbackParamList==null || callbackParamList.size()==0) {
if (CollectionTool.isEmpty(callbackParamList)) {
return;
}
// append file
byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
// generate callback data
String callbackData = GsonTool.toJson(callbackParamList);
String callbackDataMd5 = Md5Tool.md5(callbackData);
File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
if (callbackLogFile.exists()) {
for (int i = 0; i < 100; i++) {
callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
if (!callbackLogFile.exists()) {
break;
}
}
// create file
String finalLogFileName = failCallbackFileName.replace("{x}", callbackDataMd5);
// write callback log
try {
FileTool.writeString(finalLogFileName, callbackData);
} catch (IOException e) {
logger.error(">>>>>>>>>>> TriggerCallbackThread appendFailCallbackFile error, finalLogFileName:{}", finalLogFileName, e);
}
FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
}
private void retryFailCallbackFile(){
/**
* retry fail-callback file
*/
private void retryFailCallbackFile() {
// valid
File callbackLogPath = new File(failCallbackFilePath);
File callbackLogPath = new File(XxlJobFileAppender.getCallbackLogPath());
if (!callbackLogPath.exists()) {
return;
}
if (callbackLogPath.isFile()) {
callbackLogPath.delete();
// valid file type: must be directory
if (!FileTool.isDirectory(callbackLogPath)) {
FileTool.delete(callbackLogPath);
return;
}
if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
// valid file in path: pass if empty
if (ArrayTool.isEmpty(callbackLogPath.listFiles())) {
return;
}
// load and clear file, retry
// load and clear file, do retry
for (File callbackLogFile: callbackLogPath.listFiles()) {
byte[] callbackParamList_bytes = FileUtil.readFileContent(callbackLogFile);
// avoid empty file
if(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){
callbackLogFile.delete();
continue;
}
try {
// load data
String callbackData = FileTool.readString(callbackLogFile.getPath());
if (StringTool.isBlank(callbackData)) {
FileTool.delete(callbackLogFile);
continue;
}
List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
// parse callback param
List<HandleCallbackParam> callbackParamList = GsonTool.fromJsonList(callbackData, HandleCallbackParam.class);
FileTool.delete(callbackLogFile);
callbackLogFile.delete();
doCallback(callbackParamList);
// retry callback
doCallback(callbackParamList);
} catch (IOException e) {
logger.error(">>>>>>>>>>> TriggerCallbackThread retryFailCallbackFile error, callbackLogFile:{}", callbackLogFile.getPath(), e);
}
}
}

@ -1,181 +0,0 @@
package com.xxl.job.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* file tool
*
* @author xuxueli 2017-12-29 17:56:48
*/
public class FileUtil {
private static Logger logger = LoggerFactory.getLogger(FileUtil.class);
/**
* delete recursively
*
* @param root
* @return
*/
public static boolean deleteRecursively(File root) {
if (root != null && root.exists()) {
if (root.isDirectory()) {
File[] children = root.listFiles();
if (children != null) {
for (File child : children) {
deleteRecursively(child);
}
}
}
return root.delete();
}
return false;
}
public static void deleteFile(String fileName) {
// file
File file = new File(fileName);
if (file.exists()) {
file.delete();
}
}
public static void writeFileContent(File file, byte[] data) {
// file
if (!file.exists()) {
file.getParentFile().mkdirs();
}
// append file content
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
fos.write(data);
fos.flush();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public static byte[] readFileContent(File file) {
Long fileLength = file.length();
byte[] fileContent = new byte[fileLength.intValue()];
FileInputStream in = null;
try {
in = new FileInputStream(file);
in.read(fileContent);
in.close();
return fileContent;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
/*public static void appendFileLine(String fileName, String content) {
// file
File file = new File(fileName);
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
logger.error(e.getMessage(), e);
return;
}
}
// content
if (content == null) {
content = "";
}
content += "\r\n";
// append file content
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file, true);
fos.write(content.getBytes("utf-8"));
fos.flush();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public static List<String> loadFileLines(String fileName){
List<String> result = new ArrayList<>();
// valid log file
File file = new File(fileName);
if (!file.exists()) {
return result;
}
// read file
StringBuffer logContentBuffer = new StringBuffer();
int toLineNum = 0;
LineNumberReader reader = null;
try {
//reader = new LineNumberReader(new FileReader(logFile));
reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String line = null;
while ((line = reader.readLine())!=null) {
if (line!=null && line.trim().length()>0) {
result.add(line);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return result;
}*/
}

@ -1,75 +0,0 @@
package com.xxl.job.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
/**
* @author xuxueli 2020-04-12 0:14:00
*/
public class JdkSerializeTool {
private static Logger logger = LoggerFactory.getLogger(JdkSerializeTool.class);
// ------------------------ serialize and unserialize ------------------------
/**
* -->byte[] (jedisobjectbyte[])
*
* @param object
* @return
*/
public static byte[] serialize(Object object) {
ObjectOutputStream oos = null;
ByteArrayOutputStream baos = null;
try {
// 序列化
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
oos.close();
baos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
/**
* byte[] -->Object
*
* @param bytes
* @return
*/
public static <T> Object deserialize(byte[] bytes, Class<T> clazz) {
ObjectInputStream ois = null;
ByteArrayInputStream bais = null;
try {
// 反序列化
bais = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bais);
return ois.readObject();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
ois.close();
bais.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return null;
}
}

@ -1,24 +0,0 @@
package com.xxl.job.core.util;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* @author xuxueli 2018-10-20 20:07:26
*/
public class ThrowableUtil {
/**
* parse error to string
*
* @param e
* @return
*/
public static String toString(Throwable e) {
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
return errorMsg;
}
}
Loading…
Cancel
Save