- Quartz触发线程池废弃并替换为 "XxlJobThreadPool",降低线程切换、内存占用带来的消耗,提高调度性能;

- 调度线程池隔离,拆分为"Fast"和"Slow"两个线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;
2.0.2
xuxueli 6 years ago
parent f83d03f78a
commit 8658f8d853

@ -78,6 +78,7 @@ XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是
- 30、跨平台原生提供通用HTTP任务HandlerBean任务"HttpJobHandler"业务方只需要提供HTTP链接即可不限制语言、平台 - 30、跨平台原生提供通用HTTP任务HandlerBean任务"HttpJobHandler"业务方只需要提供HTTP链接即可不限制语言、平台
- 31、国际化调度中心支持国际化设置提供中文、英文两种可选语言默认为中文 - 31、国际化调度中心支持国际化设置提供中文、英文两种可选语言默认为中文
- 32、容器化提供官方docker镜像并实时更新推送dockerhub进一步实现产品开箱即用 - 32、容器化提供官方docker镜像并实时更新推送dockerhub进一步实现产品开箱即用
- 33、线程池隔离调度线程池进行隔离拆分慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;;
## Development ## Development

@ -47,6 +47,7 @@ XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是
- 30、跨平台原生提供通用HTTP任务HandlerBean任务"HttpJobHandler"业务方只需要提供HTTP链接即可不限制语言、平台 - 30、跨平台原生提供通用HTTP任务HandlerBean任务"HttpJobHandler"业务方只需要提供HTTP链接即可不限制语言、平台
- 31、国际化调度中心支持国际化设置提供中文、英文两种可选语言默认为中文 - 31、国际化调度中心支持国际化设置提供中文、英文两种可选语言默认为中文
- 32、容器化提供官方docker镜像并实时更新推送dockerhub进一步实现产品开箱即用 - 32、容器化提供官方docker镜像并实时更新推送dockerhub进一步实现产品开箱即用
- 33、线程池隔离调度线程池进行隔离拆分慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;;
### 1.3 发展 ### 1.3 发展
@ -1442,9 +1443,9 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 19、执行器优雅停机优化 - 19、执行器优雅停机优化
- 20、连接池配置优化增强连接有效性验证 - 20、连接池配置优化增强连接有效性验证
- 21、JobHandler#msg长度限制修复异常情况下日志超长导致内存溢出的问题 - 21、JobHandler#msg长度限制修复异常情况下日志超长导致内存溢出的问题
- 22、[迭代中]任务线程隔离: - 22、Quartz触发线程池废弃并替换为 "XxlJobThreadPool",降低线程切换、内存占用带来的消耗,提高调度性能;
- 执行器测异步响应,不存在阻塞不需要隔离 - 23、调度线程池隔离拆分为"Fast"和"Slow"两个线程池1分钟窗口期内任务耗时达500ms超过10次该窗口期内判定为慢任务慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性
- 调度中心共用单一调度线程池可能导致调度阻塞需要线程隔离调度线程池拆分为Fast/Slow两个针对调度较慢的执行器地址请求降级使用Slow线程池考虑是否可以任务级隔离线程池
### TODO LIST ### TODO LIST
- 1、任务分片路由分片采用一致性Hash算法计算出尽量稳定的分片顺序即使注册机器存在波动也不会引起分批分片顺序大的波动目前采用IP自然排序可以满足需求待定 - 1、任务分片路由分片采用一致性Hash算法计算出尽量稳定的分片顺序即使注册机器存在波动也不会引起分批分片顺序大的波动目前采用IP自然排序可以满足需求待定

@ -0,0 +1,58 @@
package com.xxl.job.admin.core.quartz;
import org.quartz.SchedulerConfigException;
import org.quartz.spi.ThreadPool;
/**
* single thread pool, for async trigger
*
* @author xuxueli 2019-03-06
*/
public class XxlJobThreadPool implements ThreadPool {
@Override
public boolean runInThread(Runnable runnable) {
// async run
runnable.run();
return true;
//return false;
}
@Override
public int blockForAvailableThreads() {
return 1;
}
@Override
public void initialize() throws SchedulerConfigException {
}
@Override
public void shutdown(boolean waitForJobsToComplete) {
}
@Override
public int getPoolSize() {
return 1;
}
@Override
public void setInstanceId(String schedInstId) {
}
@Override
public void setInstanceName(String schedName) {
}
// support
public void setThreadCount(int count) {
//
}
}

@ -5,10 +5,9 @@ import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue; import java.util.Map;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
/** /**
* job trigger thread pool helper * job trigger thread pool helper
@ -21,32 +20,91 @@ public class JobTriggerPoolHelper {
// ---------------------- trigger pool ---------------------- // ---------------------- trigger pool ----------------------
private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor( // fast/slow thread pool
32, private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor(
256, 8,
200,
60L, 60L,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000), new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() { new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-triggerPool-" + r.hashCode()); return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
} }
}); });
private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor(
0,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
// job timeout count
private volatile long minTim = System.currentTimeMillis()/60000; // ms > min
private volatile Map<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
/**
* add trigger
*/
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) { public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
triggerPool.execute(new Runnable() {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
} }
}); });
} }
public void stop() { public void stop() {
//triggerPool.shutdown(); //triggerPool.shutdown();
triggerPool.shutdownNow(); fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
} }

@ -9,16 +9,19 @@ org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool #org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50 #org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5 #org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true #org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.maxMisfiresToHandleAtATime: 1 org.quartz.jobStore.maxMisfiresToHandleAtATime: 1
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore #org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
# for async trigger
org.quartz.threadPool.class: com.xxl.job.admin.core.quartz.XxlJobThreadPool
# for cluster # for cluster
org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_ org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX

Loading…
Cancel
Save