From 5383861e2fc2ad441e88d4c0d9a72588461d0d46 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Mon, 5 Jul 2021 22:49:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8A=9F=E8=83=BD=E6=8C=81=E7=BB=AD?= =?UTF-8?q?=E6=9B=B4=E6=96=B0.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../threadpool/starter/builder/Builder.java | 20 ++ .../starter/builder/ThreadPoolBuilder.java | 210 ++++++++++++++++++ .../config/DynamicThreadPoolProperties.java | 2 +- .../core/ThreadPoolDynamicRefresh.java | 4 +- .../ThreadPoolChangeHandler.java} | 4 +- .../threadpool/starter/toolkit/ArrayUtil.java | 41 ++++ .../AbstractBuildThreadPoolTemplate.java | 140 ++++++++++++ .../thread/FastThreadPoolExecutor.java | 65 ++++++ .../toolkit/thread/RejectedPolicies.java | 56 +++++ .../starter/toolkit/thread/TaskQueue.java | 52 +++++ .../toolkit/thread/ThreadFactoryBuilder.java | 148 ++++++++++++ .../thread/ThreadPoolExecutorTemplate.java | 65 ++++++ pom.xml | 10 +- 13 files changed, 811 insertions(+), 6 deletions(-) create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/Builder.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/ThreadPoolBuilder.java rename dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/{toolkit/ThreadPoolChangeUtil.java => handler/ThreadPoolChangeHandler.java} (94%) create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ArrayUtil.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/FastThreadPoolExecutor.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/RejectedPolicies.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/TaskQueue.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolExecutorTemplate.java diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/Builder.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/Builder.java new file mode 100644 index 00000000..5900bdb8 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/Builder.java @@ -0,0 +1,20 @@ +package io.dynamic.threadpool.starter.builder; + +import java.io.Serializable; + +/** + * 建造者模式接口定义 + * + * @author chen.ma + * @date 2021/7/5 21:39 + */ +public interface Builder extends Serializable { + + /** + * 构建 + * + * @return 被构建的对象 + */ + T build(); + +} \ No newline at end of file diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/ThreadPoolBuilder.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/ThreadPoolBuilder.java new file mode 100644 index 00000000..9963b4f7 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/builder/ThreadPoolBuilder.java @@ -0,0 +1,210 @@ +package io.dynamic.threadpool.starter.builder; + + +import io.dynamic.threadpool.common.enums.QueueTypeEnum; +import io.dynamic.threadpool.common.toolkit.Assert; +import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil; +import io.dynamic.threadpool.starter.toolkit.thread.AbstractBuildThreadPoolTemplate; + +import java.math.BigDecimal; +import java.util.concurrent.*; + +/** + * 线程池构造器 + * + * @author chen.ma + * @date 2021/6/28 17:29 + */ +public class ThreadPoolBuilder implements Builder { + + /** + * 是否创建快速消费线程池 + */ + private boolean isFastPool; + + /** + * 核心线程数量 + */ + private Integer corePoolNum = calculateCoreNum(); + + /** + * 最大线程数量 + */ + private Integer maxPoolNum = corePoolNum + (corePoolNum >> 1); + + /** + * 线程存活时间 + */ + private Long keepAliveTime = 30000L; + + /** + * 线程存活时间单位 + */ + private TimeUnit timeUnit = TimeUnit.MILLISECONDS; + + /** + * 队列最大容量 + */ + private Integer capacity = 512; + + /** + * 队列类型枚举 + */ + private QueueTypeEnum queueType; + + /** + * 阻塞队列 + */ + private BlockingQueue workQueue = new LinkedBlockingQueue(capacity); + + /** + * 线程池任务满时拒绝任务策略 + */ + private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); + + /** + * 是否守护线程 + */ + private boolean isDaemon = false; + + /** + * 线程名称前缀 + */ + private String threadNamePrefix; + + /** + * 计算公式:CPU 核数 / (1 - 阻塞系数 0.8) + * + * @return 线程池核心线程数 + */ + private Integer calculateCoreNum() { + int cpuCoreNum = Runtime.getRuntime().availableProcessors(); + return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue(); + } + + public ThreadPoolBuilder isFastPool(Boolean isFastPool) { + this.isFastPool = isFastPool; + return this; + } + + public ThreadPoolBuilder threadFactory(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + return this; + } + + public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) { + this.threadNamePrefix = threadNamePrefix; + this.isDaemon = isDaemon; + return this; + } + + public ThreadPoolBuilder corePoolNum(Integer corePoolNum) { + this.corePoolNum = corePoolNum; + return this; + } + + public ThreadPoolBuilder maxPoolNum(Integer maxPoolNum) { + this.maxPoolNum = maxPoolNum; + return this; + } + + public ThreadPoolBuilder keepAliveTime(Long keepAliveTime) { + this.keepAliveTime = keepAliveTime; + return this; + } + + public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + return this; + } + + public ThreadPoolBuilder capacity(Integer capacity) { + this.capacity = capacity; + return this; + } + + public ThreadPoolBuilder rejected(RejectedExecutionHandler rejectedExecutionHandler) { + this.rejectedExecutionHandler = rejectedExecutionHandler; + return this; + } + + /** + * 使用此方式赋值 workQueue, capacity 失效 + * + * @param queueType + * @return + */ + public ThreadPoolBuilder workQueue(QueueTypeEnum queueType) { + this.queueType = queueType; + return this; + } + + /** + * 构建 + * + * @return + */ + @Override + public ThreadPoolExecutor build() { + return isFastPool ? buildFastPool(this) : buildPool(this); + } + + /** + * 创建 + * + * @return + */ + public static ThreadPoolBuilder builder() { + return new ThreadPoolBuilder(); + } + + /** + * 构建普通线程池 + * + * @param builder + * @return + */ + public static ThreadPoolExecutor buildPool(ThreadPoolBuilder builder) { + return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder)); + } + + /** + * 构建快速消费线程池 + * + * @param builder + * @return + */ + public static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) { + return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder)); + } + + /** + * 构建初始化参数 + * + * @param builder + * @return + */ + private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { + Assert.notEmpty(builder.threadNamePrefix, "线程名称前缀不可为空或空的字符串."); + AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = + new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); + + initParam.setCorePoolNum(builder.corePoolNum) + .setMaxPoolNum(builder.maxPoolNum) + .setKeepAliveTime(builder.keepAliveTime) + .setCapacity(builder.capacity) + .setRejectedExecutionHandler(builder.rejectedExecutionHandler) + .setTimeUnit(builder.timeUnit); + + // 快速消费线程池内置指定线程池 + if (!builder.isFastPool) { + BlockingQueue blockingQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity); + if (blockingQueue == null) { + blockingQueue = builder.workQueue; + } + initParam.setWorkQueue(blockingQueue); + } + + return initParam; + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java index 1e632560..c8d70304 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java @@ -42,5 +42,5 @@ public class DynamicThreadPoolProperties { /** * 是否开启 banner */ - private boolean banner; + private boolean banner = true; } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java index 73fe9b5b..633c7f94 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java @@ -2,7 +2,7 @@ package io.dynamic.threadpool.starter.core; import com.alibaba.fastjson.JSON; import io.dynamic.threadpool.common.model.PoolParameterInfo; -import io.dynamic.threadpool.starter.toolkit.ThreadPoolChangeUtil; +import io.dynamic.threadpool.starter.handler.ThreadPoolChangeHandler; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ThreadPoolExecutor; @@ -32,7 +32,7 @@ public class ThreadPoolDynamicRefresh { log.info("[✈️] Original thread pool. coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}", executor.getCorePoolSize(), executor.getMaximumPoolSize(), queueType, executor.getQueue().remainingCapacity(), executor.getKeepAliveTime(TimeUnit.MILLISECONDS)); - ThreadPoolChangeUtil.changePool(executor, coreSize, maxSize, queueType, capacity, keepAliveTime); + ThreadPoolChangeHandler.changePool(executor, coreSize, maxSize, queueType, capacity, keepAliveTime); ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); log.info("[🚀] Changed thread pool. coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}", afterExecutor.getCorePoolSize(), afterExecutor.getMaximumPoolSize(), queueType, afterExecutor.getQueue().remainingCapacity(), afterExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS)); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ThreadPoolChangeUtil.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/handler/ThreadPoolChangeHandler.java similarity index 94% rename from dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ThreadPoolChangeUtil.java rename to dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/handler/ThreadPoolChangeHandler.java index 8856337a..128719dd 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ThreadPoolChangeUtil.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/handler/ThreadPoolChangeHandler.java @@ -1,4 +1,4 @@ -package io.dynamic.threadpool.starter.toolkit; +package io.dynamic.threadpool.starter.handler; import io.dynamic.threadpool.common.enums.QueueTypeEnum; import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue; @@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; * @date 2021/6/25 17:19 */ @Slf4j -public class ThreadPoolChangeUtil { +public class ThreadPoolChangeHandler { public static void changePool(ThreadPoolExecutor executor, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime) { if (coreSize != null) { diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ArrayUtil.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ArrayUtil.java new file mode 100644 index 00000000..211ab3c1 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/ArrayUtil.java @@ -0,0 +1,41 @@ +package io.dynamic.threadpool.starter.toolkit; + +import java.lang.reflect.Array; + +/** + * Array Util. + * + * @author chen.ma + * @date 2021/7/5 21:54 + */ +public class ArrayUtil { + + public static T[] addAll(final T[] array1, @SuppressWarnings("unchecked") final T... array2) { + if (array1 == null) { + return clone(array2); + } else if (array2 == null) { + return clone(array1); + } + final Class type1 = array1.getClass().getComponentType(); + @SuppressWarnings("unchecked") final T[] joinedArray = (T[]) Array.newInstance(type1, array1.length + array2.length); + System.arraycopy(array1, 0, joinedArray, 0, array1.length); + try { + System.arraycopy(array2, 0, joinedArray, array1.length, array2.length); + } catch (final ArrayStoreException ase) { + final Class type2 = array2.getClass().getComponentType(); + if (!type1.isAssignableFrom(type2)) { + throw new IllegalArgumentException("Cannot store " + type2.getName() + " in an array of " + + type1.getName(), ase); + } + throw ase; + } + return joinedArray; + } + + public static T[] clone(final T[] array) { + if (array == null) { + return null; + } + return array.clone(); + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java new file mode 100644 index 00000000..12a2b77e --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java @@ -0,0 +1,140 @@ +package io.dynamic.threadpool.starter.toolkit.thread; + +import io.dynamic.threadpool.common.toolkit.Assert; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * 抽象线程池模版构建 + * + * @author chen.ma + * @date 2021/7/5 21:45 + */ +@Slf4j +public class AbstractBuildThreadPoolTemplate { + + /** + * 线程池构建初始化参数 + *

+ * 此处本身是模版设计方法, 但是考虑创建简洁性, 移除 abstract + * 异常参考 {@link AbstractQueuedSynchronizer#tryAcquire} + * + * @return + */ + protected static ThreadPoolInitParam initParam() { + throw new UnsupportedOperationException(); + } + + /** + * 构建线程池 + * + * @return + */ + public static ThreadPoolExecutor buildPool() { + ThreadPoolInitParam initParam = initParam(); + return buildPool(initParam); + } + + /** + * 构建线程池 + * + * @return + */ + public static ThreadPoolExecutor buildPool(ThreadPoolInitParam initParam) { + Assert.notNull(initParam); + ThreadPoolExecutor executorService = + new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(), + initParam.getMaxPoolNum(), + initParam.getKeepAliveTime(), + initParam.getTimeUnit(), + initParam.getWorkQueue(), + initParam.getThreadFactory(), + initParam.rejectedExecutionHandler); + return executorService; + } + + /** + * 构建快速执行线程池 + * + * @return + */ + public static ThreadPoolExecutor buildFastPool() { + ThreadPoolInitParam initParam = initParam(); + return buildFastPool(initParam); + } + + /** + * 构建快速执行线程池 + * + * @return + */ + public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) { + TaskQueue taskQueue = new TaskQueue(initParam.getCapacity()); + FastThreadPoolExecutor fastThreadPoolExecutor = + new FastThreadPoolExecutor(initParam.getCorePoolNum(), + initParam.getMaxPoolNum(), + initParam.getKeepAliveTime(), + initParam.getTimeUnit(), + taskQueue, + initParam.getThreadFactory(), + initParam.rejectedExecutionHandler); + taskQueue.setExecutor(fastThreadPoolExecutor); + return fastThreadPoolExecutor; + } + + @Data + @Accessors(chain = true) + public static class ThreadPoolInitParam { + + /** + * 核心线程数量 + */ + private Integer corePoolNum; + + /** + * 最大线程数量 + */ + private Integer maxPoolNum; + + /** + * 线程存活时间 + */ + private Long keepAliveTime; + + /** + * 线程存活时间单位 + */ + private TimeUnit timeUnit; + + /** + * 队列最大容量 + */ + private Integer capacity; + + /** + * 阻塞队列 + */ + private BlockingQueue workQueue; + + /** + * 线程池任务满时拒绝任务策略 + */ + private RejectedExecutionHandler rejectedExecutionHandler; + + /** + * 创建线程工厂 + */ + private ThreadFactory threadFactory; + + public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { + this.threadFactory = new ThreadFactoryBuilder() + .setNamePrefix(threadNamePrefix + "-") + .setDaemon(isDaemon) + .build(); + } + } +} \ No newline at end of file diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/FastThreadPoolExecutor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/FastThreadPoolExecutor.java new file mode 100644 index 00000000..f37bfcc3 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/FastThreadPoolExecutor.java @@ -0,0 +1,65 @@ +package io.dynamic.threadpool.starter.toolkit.thread; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 快速执行任务线程池, 参考 Dubbo 线程模型 EagerThreadPoolExecutor + *

+ * 配合 {@link TaskQueue} + * + * @author chen.ma + * @date 2021/7/5 21:00 + */ +@Slf4j +public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate { + + public FastThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + TaskQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + private final AtomicInteger submittedTaskCount = new AtomicInteger(0); + + public int getSubmittedTaskCount() { + return submittedTaskCount.get(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + submittedTaskCount.decrementAndGet(); + } + + @Override + public void execute(Runnable command) { + submittedTaskCount.incrementAndGet(); + try { + super.execute(command); + } catch (RejectedExecutionException rx) { + final TaskQueue queue = (TaskQueue) super.getQueue(); + try { + if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { + submittedTaskCount.decrementAndGet(); + throw new RejectedExecutionException("队列容量已满.", rx); + } + } catch (InterruptedException x) { + submittedTaskCount.decrementAndGet(); + throw new RejectedExecutionException(x); + } + } catch (Exception t) { + submittedTaskCount.decrementAndGet(); + throw t; + } + } + +} \ No newline at end of file diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/RejectedPolicies.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/RejectedPolicies.java new file mode 100644 index 00000000..9b2ca906 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/RejectedPolicies.java @@ -0,0 +1,56 @@ +package io.dynamic.threadpool.starter.toolkit.thread; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; + +/** + * 线程池拒绝策略 + * + * @author chen.ma + * @date 2021/7/5 21:23 + */ +@Slf4j +public class RejectedPolicies { + + /** + * 发生拒绝事件时, 添加新任务并运行最早的任务 + * + * @return + */ + public static RejectedExecutionHandler runsOldestTaskPolicy() { + return (r, executor) -> { + if (executor.isShutdown()) { + return; + } + BlockingQueue workQueue = executor.getQueue(); + Runnable firstWork = workQueue.poll(); + boolean newTaskAdd = workQueue.offer(r); + if (firstWork != null) { + firstWork.run(); + } + if (!newTaskAdd) { + executor.execute(r); + } + }; + } + + /** + * 使用阻塞方法将拒绝任务添加队列, 可保证任务不丢失 + * + * @return + */ + public static RejectedExecutionHandler syncPutQueuePolicy() { + return (r, executor) -> { + if (executor.isShutdown()) { + return; + } + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + log.error("线程池添加队列任务失败", e); + } + }; + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/TaskQueue.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/TaskQueue.java new file mode 100644 index 00000000..f32db2e3 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/TaskQueue.java @@ -0,0 +1,52 @@ +package io.dynamic.threadpool.starter.toolkit.thread; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * 快速执行任务而阻塞队列, 参考 Dubbo 重写队列 TaskQueue + *

+ * 配合 {@link FastThreadPoolExecutor} 使用 + * + * @author chen.ma + * @date 2021/7/5 21:00 + */ +public class TaskQueue extends LinkedBlockingQueue { + + private static final long serialVersionUID = -2635853580887179627L; + + private FastThreadPoolExecutor executor; + + public TaskQueue(int capacity) { + super(capacity); + } + + public void setExecutor(FastThreadPoolExecutor exec) { + executor = exec; + } + + @Override + public boolean offer(Runnable runnable) { + int currentPoolThreadSize = executor.getPoolSize(); + // 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务 + if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { + return super.offer(runnable); + } + + // 当前线程池线程数量小于最大线程数, 返回false, 根据线程池源码, 会创建非核心线程 + if (currentPoolThreadSize < executor.getMaximumPoolSize()) { + return false; + } + + // 如果当前线程池数量大于最大线程数, 任务加入阻塞队列 + return super.offer(runnable); + } + + public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { + if (executor.isShutdown()) { + throw new RejectedExecutionException("执行器已关闭!"); + } + return super.offer(o, timeout, unit); + } +} \ No newline at end of file diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java new file mode 100644 index 00000000..fb345e8e --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java @@ -0,0 +1,148 @@ +package io.dynamic.threadpool.starter.toolkit.thread; + +import io.dynamic.threadpool.starter.builder.Builder; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +/** + * ThreadFactory 创建器 + * + * @author chen.ma + * @date 2021/7/5 21:53 + */ +public class ThreadFactoryBuilder implements Builder { + + private static final long serialVersionUID = 1L; + + /** + * 用于线程创建的线程工厂类 + */ + private ThreadFactory backingThreadFactory; + + /** + * 线程名的前缀 + */ + private String namePrefix; + + /** + * 是否守护线程,默认false + */ + private Boolean daemon; + + /** + * 线程优先级 + */ + private Integer priority; + + /** + * 未捕获异常处理器 + */ + private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + + /** + * 设置用于创建基础线程的线程工厂 + * + * @param backingThreadFactory 用于创建基础线程的线程工厂 + * @return this + */ + public ThreadFactoryBuilder setThreadFactory(ThreadFactory backingThreadFactory) { + this.backingThreadFactory = backingThreadFactory; + return this; + } + + /** + * 设置线程名前缀, 例如设置前缀为 mb-thread- 则线程名为 mb-thread-1 之类 + * + * @param namePrefix 线程名前缀 + * @return this + */ + public ThreadFactoryBuilder setNamePrefix(String namePrefix) { + this.namePrefix = namePrefix; + return this; + } + + /** + * 设置是否守护线程 + * + * @param daemon 是否守护线程 + * @return this + */ + public ThreadFactoryBuilder setDaemon(boolean daemon) { + this.daemon = daemon; + return this; + } + + /** + * 设置线程优先级 + * + * @param priority 优先级 + * @return this + * @see Thread#MIN_PRIORITY + * @see Thread#NORM_PRIORITY + * @see Thread#MAX_PRIORITY + */ + public ThreadFactoryBuilder setPriority(int priority) { + if (priority < Thread.MIN_PRIORITY) { + throw new IllegalArgumentException(String.format("Thread priority ({}) must be >= {}", priority, Thread.MIN_PRIORITY)); + } + if (priority > Thread.MAX_PRIORITY) { + throw new IllegalArgumentException(String.format("Thread priority ({}) must be <= {}", priority, Thread.MAX_PRIORITY)); + } + this.priority = priority; + return this; + } + + /** + * 设置未捕获异常的处理方式 + * + * @param uncaughtExceptionHandler {@link Thread.UncaughtExceptionHandler} + */ + public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + } + + /** + * 构建 {@link ThreadFactory} + * + * @return {@link ThreadFactory} + */ + @Override + public ThreadFactory build() { + return build(this); + } + + /** + * 构建 + * + * @param builder {@link ThreadFactoryBuilder} + * @return {@link ThreadFactory} + */ + private static ThreadFactory build(ThreadFactoryBuilder builder) { + final ThreadFactory backingThreadFactory = (null != builder.backingThreadFactory) + ? builder.backingThreadFactory + : Executors.defaultThreadFactory(); + final String namePrefix = builder.namePrefix; + final Boolean daemon = builder.daemon; + final Integer priority = builder.priority; + final Thread.UncaughtExceptionHandler handler = builder.uncaughtExceptionHandler; + final AtomicLong count = (null == namePrefix) ? null : new AtomicLong(); + return r -> { + final Thread thread = backingThreadFactory.newThread(r); + if (null != namePrefix) { + thread.setName(namePrefix + count.getAndIncrement()); + } + if (null != daemon) { + thread.setDaemon(daemon); + } + if (null != priority) { + thread.setPriority(priority); + } + if (null != handler) { + thread.setUncaughtExceptionHandler(handler); + } + return thread; + }; + } +} \ No newline at end of file diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolExecutorTemplate.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolExecutorTemplate.java new file mode 100644 index 00000000..57ecbac4 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolExecutorTemplate.java @@ -0,0 +1,65 @@ +package io.dynamic.threadpool.starter.toolkit.thread; + +import io.dynamic.threadpool.starter.toolkit.ArrayUtil; + +import java.util.concurrent.*; + +/** + * 线程池创建模版 + * + * @author chen.ma + * @date 2021/7/5 21:59 + */ +public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor { + + public ThreadPoolExecutorTemplate(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + private Exception clientTrace() { + return new Exception("tread task root stack trace"); + } + + @Override + public void execute(final Runnable command) { + super.execute(wrap(command, clientTrace())); + } + + @Override + public Future submit(final Runnable task) { + return super.submit(wrap(task, clientTrace())); + } + + @Override + public Future submit(final Callable task) { + return super.submit(wrap(task, clientTrace())); + } + + private Runnable wrap(final Runnable task, final Exception clientStack) { + return () -> { + try { + task.run(); + } catch (Exception e) { + e.setStackTrace(ArrayUtil.addAll(clientStack.getStackTrace(), e.getStackTrace())); + throw e; + } + }; + } + + private Callable wrap(final Callable task, final Exception clientStack) { + return () -> { + try { + return task.call(); + } catch (Exception e) { + e.setStackTrace(ArrayUtil.addAll(clientStack.getStackTrace(), e.getStackTrace())); + throw e; + } + }; + } +} diff --git a/pom.xml b/pom.xml index 4e9d42b9..ca4ad1da 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,15 @@ pom ${project.artifactId} - 🔥 强大的动态线程池,附带监控报警功能 + https://github.com/longtai94/dynamic-thread-pool + 🔥 强大的动态线程池,没有依赖任何中间件(未来也不打算依赖),附带监控线程池功能 + + + + chen.ma + https://github.com/longtai94 + + 1.8