From d7fb126ea02e5e40d4eda6e4fa02c6ae54703a86 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 8 Jul 2021 22:18:58 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E7=9B=B8=E5=85=B3=E5=B7=A5=E5=85=B7=E7=B1=BB?= =?UTF-8?q?.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../adapter/ThreadPoolConfigAdapter.java | 2 +- .../listener/ThreadPoolRunListener.java | 24 ++++-- .../AbstractBuildThreadPoolTemplate.java | 27 ++++++- .../toolkit/thread/ThreadFactoryBuilder.java | 21 ++++-- .../toolkit/thread/ThreadPoolBuilder.java | 75 ++++++++++++------- 5 files changed, 105 insertions(+), 44 deletions(-) diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java index 16557df8..583234c9 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java @@ -28,7 +28,7 @@ public class ThreadPoolConfigAdapter extends ConfigAdapter { private ThreadPoolOperation threadPoolOperation; private ExecutorService executorService = ThreadPoolBuilder.builder() - .poolThreadNum(2, 4) + .poolThreadSize(2, 4) .keepAliveTime(0L, TimeUnit.MILLISECONDS) .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1) .threadFactory("threadPool-config") diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java index 6b433bbd..7ff0e7eb 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java @@ -12,6 +12,8 @@ import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage; import io.dynamic.threadpool.starter.remote.HttpAgent; import io.dynamic.threadpool.starter.remote.ServerHttpAgent; import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil; +import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; +import io.dynamic.threadpool.starter.wrap.CustomThreadPoolExecutor; import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import lombok.extern.slf4j.Slf4j; import org.springframework.core.annotation.Order; @@ -45,8 +47,9 @@ public class ThreadPoolRunListener { Map executorMap = ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class); executorMap.forEach((key, val) -> { + String tpId = val.getTpId(); Map queryStrMap = new HashMap(3); - queryStrMap.put("tpId", val.getTpId()); + queryStrMap.put("tpId", tpId); queryStrMap.put("itemId", properties.getItemId()); queryStrMap.put("namespace", properties.getNamespace()); @@ -56,18 +59,25 @@ public class ThreadPoolRunListener { try { result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); - if (result.isSuccess() && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { + if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { // 使用相关参数创建线程池 - TimeUnit unit = TimeUnit.SECONDS; BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - ThreadPoolExecutor resultTpe = new ThreadPoolExecutor(ppi.getCoreSize(), ppi.getMaxSize(), ppi.getKeepAliveTime(), unit, workQueue); - val.setPool(resultTpe); + ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.builder() + .isCustomPool(true) + .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) + .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) + .workQueue(workQueue) + .threadFactory(tpId) + .rejected(new CustomThreadPoolExecutor.AbortPolicy()) + .build(); + + val.setPool(poolExecutor); } else if (val.getPool() == null) { - val.setPool(CommonThreadPool.getInstance(val.getTpId())); + val.setPool(CommonThreadPool.getInstance(tpId)); } } catch (Exception ex) { log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); - val.setPool(CommonThreadPool.getInstance(val.getTpId())); + val.setPool(CommonThreadPool.getInstance(tpId)); } GlobalThreadPoolManage.register(val.getTpId(), ppi, val); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java index 12a2b77e..4e7bce27 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java @@ -1,6 +1,7 @@ package io.dynamic.threadpool.starter.toolkit.thread; import io.dynamic.threadpool.common.toolkit.Assert; +import io.dynamic.threadpool.starter.wrap.CustomThreadPoolExecutor; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -86,6 +87,26 @@ public class AbstractBuildThreadPoolTemplate { return fastThreadPoolExecutor; } + /** + * 构建自定义线程池 + * + * @param initParam + * @return + */ + public static CustomThreadPoolExecutor buildCustomPool(ThreadPoolInitParam initParam) { + Assert.notNull(initParam); + CustomThreadPoolExecutor executorService = + new CustomThreadPoolExecutor(initParam.getCorePoolNum(), + initParam.getMaxPoolNum(), + initParam.getKeepAliveTime(), + initParam.getTimeUnit(), + initParam.getWorkQueue(), + initParam.getThreadFactory(), + initParam.rejectedExecutionHandler); + + return executorService; + } + @Data @Accessors(chain = true) public static class ThreadPoolInitParam { @@ -131,9 +152,9 @@ public class AbstractBuildThreadPoolTemplate { private ThreadFactory threadFactory; public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { - this.threadFactory = new ThreadFactoryBuilder() - .setNamePrefix(threadNamePrefix + "-") - .setDaemon(isDaemon) + this.threadFactory = ThreadFactoryBuilder.builder() + .prefix(threadNamePrefix) + .daemon(isDaemon) .build(); } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java index fb345e8e..2e55f1bb 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadFactoryBuilder.java @@ -47,7 +47,7 @@ public class ThreadFactoryBuilder implements Builder { * @param backingThreadFactory 用于创建基础线程的线程工厂 * @return this */ - public ThreadFactoryBuilder setThreadFactory(ThreadFactory backingThreadFactory) { + public ThreadFactoryBuilder threadFactory(ThreadFactory backingThreadFactory) { this.backingThreadFactory = backingThreadFactory; return this; } @@ -58,7 +58,7 @@ public class ThreadFactoryBuilder implements Builder { * @param namePrefix 线程名前缀 * @return this */ - public ThreadFactoryBuilder setNamePrefix(String namePrefix) { + public ThreadFactoryBuilder prefix(String namePrefix) { this.namePrefix = namePrefix; return this; } @@ -69,7 +69,7 @@ public class ThreadFactoryBuilder implements Builder { * @param daemon 是否守护线程 * @return this */ - public ThreadFactoryBuilder setDaemon(boolean daemon) { + public ThreadFactoryBuilder daemon(boolean daemon) { this.daemon = daemon; return this; } @@ -83,7 +83,7 @@ public class ThreadFactoryBuilder implements Builder { * @see Thread#NORM_PRIORITY * @see Thread#MAX_PRIORITY */ - public ThreadFactoryBuilder setPriority(int priority) { + public ThreadFactoryBuilder priority(int priority) { if (priority < Thread.MIN_PRIORITY) { throw new IllegalArgumentException(String.format("Thread priority ({}) must be >= {}", priority, Thread.MIN_PRIORITY)); } @@ -99,10 +99,19 @@ public class ThreadFactoryBuilder implements Builder { * * @param uncaughtExceptionHandler {@link Thread.UncaughtExceptionHandler} */ - public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + public void uncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { this.uncaughtExceptionHandler = uncaughtExceptionHandler; } + /** + * 构建 + * + * @return + */ + public static ThreadFactoryBuilder builder() { + return new ThreadFactoryBuilder(); + } + /** * 构建 {@link ThreadFactory} * @@ -131,7 +140,7 @@ public class ThreadFactoryBuilder implements Builder { return r -> { final Thread thread = backingThreadFactory.newThread(r); if (null != namePrefix) { - thread.setName(namePrefix + count.getAndIncrement()); + thread.setName(namePrefix + "-" + count.getAndIncrement()); } if (null != daemon) { thread.setDaemon(daemon); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java index 28cd9526..5baad9f3 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/toolkit/thread/ThreadPoolBuilder.java @@ -22,20 +22,25 @@ public class ThreadPoolBuilder implements Builder { */ private boolean isFastPool; + /** + * 是否自定义线程池 + */ + private boolean isCustomPool; + /** * 核心线程数量 */ - private Integer corePoolNum = calculateCoreNum(); + private int corePoolSize = calculateCoreNum(); /** * 最大线程数量 */ - private Integer maxPoolNum = corePoolNum + (corePoolNum >> 1); + private int maxPoolSize = corePoolSize + (corePoolSize >> 1); /** * 线程存活时间 */ - private Long keepAliveTime = 30000L; + private long keepAliveTime = 30000L; /** * 线程存活时间单位 @@ -45,7 +50,7 @@ public class ThreadPoolBuilder implements Builder { /** * 队列最大容量 */ - private Integer capacity = 512; + private int capacity = 512; /** * 队列类型枚举 @@ -87,6 +92,11 @@ public class ThreadPoolBuilder implements Builder { return this; } + public ThreadPoolBuilder isCustomPool(Boolean isCustomPool) { + this.isCustomPool = isCustomPool; + return this; + } + public ThreadPoolBuilder threadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; return this; @@ -98,23 +108,23 @@ public class ThreadPoolBuilder implements Builder { return this; } - public ThreadPoolBuilder corePoolNum(Integer corePoolNum) { - this.corePoolNum = corePoolNum; + public ThreadPoolBuilder corePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; return this; } - public ThreadPoolBuilder maxPoolNum(Integer maxPoolNum) { - this.maxPoolNum = maxPoolNum; + public ThreadPoolBuilder maxPoolNum(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; return this; } - public ThreadPoolBuilder poolThreadNum(Integer corePoolNum, Integer maxPoolNum) { - this.corePoolNum = corePoolNum; - this.maxPoolNum = maxPoolNum; + public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) { + this.corePoolSize = corePoolSize; + this.maxPoolSize = maxPoolSize; return this; } - public ThreadPoolBuilder keepAliveTime(Long keepAliveTime) { + public ThreadPoolBuilder keepAliveTime(long keepAliveTime) { this.keepAliveTime = keepAliveTime; return this; } @@ -124,18 +134,18 @@ public class ThreadPoolBuilder implements Builder { return this; } - public ThreadPoolBuilder keepAliveTime(Long keepAliveTime, TimeUnit timeUnit) { + public ThreadPoolBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) { this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; return this; } - public ThreadPoolBuilder capacity(Integer capacity) { + public ThreadPoolBuilder capacity(int capacity) { this.capacity = capacity; return this; } - public ThreadPoolBuilder workQueue(QueueTypeEnum queueType, Integer capacity) { + public ThreadPoolBuilder workQueue(QueueTypeEnum queueType, int capacity) { this.queueType = queueType; this.capacity = capacity; return this; @@ -146,17 +156,16 @@ public class ThreadPoolBuilder implements Builder { return this; } - /** - * 使用此方式赋值 workQueue, capacity 失效 - * - * @param queueType - * @return - */ public ThreadPoolBuilder workQueue(QueueTypeEnum queueType) { this.queueType = queueType; return this; } + public ThreadPoolBuilder workQueue(BlockingQueue workQueue) { + this.workQueue = workQueue; + return this; + } + /** * 构建 * @@ -164,6 +173,9 @@ public class ThreadPoolBuilder implements Builder { */ @Override public ThreadPoolExecutor build() { + if (isCustomPool) { + return buildCustomPool(this); + } return isFastPool ? buildFastPool(this) : buildPool(this); } @@ -196,6 +208,16 @@ public class ThreadPoolBuilder implements Builder { return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder)); } + /** + * 构建自定义线程池 + * + * @param builder + * @return + */ + private static ThreadPoolExecutor buildCustomPool(ThreadPoolBuilder builder) { + return AbstractBuildThreadPoolTemplate.buildCustomPool(buildInitParam(builder)); + } + /** * 构建初始化参数 * @@ -207,8 +229,8 @@ public class ThreadPoolBuilder implements Builder { AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); - initParam.setCorePoolNum(builder.corePoolNum) - .setMaxPoolNum(builder.maxPoolNum) + initParam.setCorePoolNum(builder.corePoolSize) + .setMaxPoolNum(builder.maxPoolSize) .setKeepAliveTime(builder.keepAliveTime) .setCapacity(builder.capacity) .setRejectedExecutionHandler(builder.rejectedExecutionHandler) @@ -216,11 +238,10 @@ public class ThreadPoolBuilder implements Builder { // 快速消费线程池内置指定线程池 if (!builder.isFastPool) { - BlockingQueue blockingQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity); - if (blockingQueue == null) { - blockingQueue = builder.workQueue; + if (builder.workQueue == null) { + builder.workQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity); } - initParam.setWorkQueue(blockingQueue); + initParam.setWorkQueue(builder.workQueue); } return initParam;