pull/56/head
liyong 4 years ago
parent 85270a7b3d
commit adcde32ba1

@ -0,0 +1,4 @@
源码仓库地址
https://github.com/xuxueli/xxl-job/releases
http://gitee.com/xuxueli0323/xxl-job/releases

@ -44,7 +44,7 @@ public class XxlJobScheduler {
// admin log report start // admin log report start
//todo 开启 logrThread 统计报表相关 间隔一分钟 //todo 开启 logrThread 统计报表相关 间隔一分钟
//TODO 每次统计都是统计一天的 数据量过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的 //TODO 每次统计都是统计一天的 没做大数据量的限制 过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的
JobLogReportHelper.getInstance().start(); JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper ) // start-schedule ( depend on JobTriggerPoolHelper )

@ -44,6 +44,7 @@ public class JobFailMonitorHelper {
// lock log // lock log
//这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题 //这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题
//TODO 如果在这一步服务挂了 这批数据就会被下一个线程扫描为失败
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) { if (lockRet < 1) {
continue; continue;

@ -79,6 +79,7 @@ public class JobTriggerPoolHelper {
// choose thread pool // choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool; ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
//慢任务 超过 10次 则调用单独的线程池
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool; triggerPool_ = slowTriggerPool;
} }

@ -127,7 +127,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
// executor block strategy // executor block strategy
if (jobThread != null) { if (jobThread != null) {
//TODO 阻塞处理策略 //TODO 阻塞处理策略 注意点 这边阻塞策略是在客户端 实现的 如果路由策略 每次执行的节点不一样 就会导致重复执行
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
//丢弃后续调用 //丢弃后续调用

@ -154,6 +154,7 @@ public class XxlJobExecutor {
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address // generate address
//TODO 优先使用address
if (address==null || address.trim().length()==0) { if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-addressdefault use address to registry , otherwise use ip:port if address is null String ip_port_address = IpUtil.getIpPort(ip, port); // registry-addressdefault use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
@ -217,7 +218,7 @@ public class XxlJobExecutor {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" ."); "The correct method format like \" public ReturnT<String> execute(String param) \" .");
}*/ }*/
//TODO 设置访问权限 说明私有的方法也是支持的 demo待实践 //TODO 设置访问权限 说明私有的方法也是支持的
executeMethod.setAccessible(true); executeMethod.setAccessible(true);
// init and destroy // init and destroy

@ -213,7 +213,7 @@ public class EmbedServer {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam); return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) { } else if ("/log".equals(uri)) {
//TODO 查询日志 //TODO 查询日志 注意 实时通过客户端查询日志 如果客户端是在容器中每次部署要注意下日志路径
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam); return executorBiz.log(logParam);
} else { } else {

@ -34,7 +34,7 @@ public class JobLogFileCleanThread {
*/ */
public void start(final long logRetentionDays){ public void start(final long logRetentionDays){
// limit min value 最小值三天一清理 // TDOD 注意点 最小值三天一清理
if (logRetentionDays < 3 ) { if (logRetentionDays < 3 ) {
return; return;
} }

@ -190,7 +190,7 @@ public class TriggerCallbackThread {
} }
} }
if (!callbackRet) { if (!callbackRet) {
//TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 //TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 目前不会有这种场景
appendFailCallbackFile(callbackParamList); appendFailCallbackFile(callbackParamList);
} }
} }

@ -16,7 +16,8 @@ xxl.job.accessToken=
### xxl-job executor appname ### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=1.1.1.1 ### 注册到服务端的地址 与 ip:port2选1 优先address
xxl.job.executor.address=
### xxl-job executor server-info ### xxl-job executor server-info
xxl.job.executor.ip= xxl.job.executor.ip=
xxl.job.executor.port=9999 xxl.job.executor.port=9999

Loading…
Cancel
Save