refactor: 重构线程池相关工具类.

pull/161/head
chen.ma 3 years ago
parent 2b63ee1944
commit d7fb126ea0

@ -28,7 +28,7 @@ public class ThreadPoolConfigAdapter extends ConfigAdapter {
private ThreadPoolOperation threadPoolOperation; private ThreadPoolOperation threadPoolOperation;
private ExecutorService executorService = ThreadPoolBuilder.builder() private ExecutorService executorService = ThreadPoolBuilder.builder()
.poolThreadNum(2, 4) .poolThreadSize(2, 4)
.keepAliveTime(0L, TimeUnit.MILLISECONDS) .keepAliveTime(0L, TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1) .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1)
.threadFactory("threadPool-config") .threadFactory("threadPool-config")

@ -12,6 +12,8 @@ import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import io.dynamic.threadpool.starter.remote.HttpAgent; import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent; import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil; import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.wrap.CustomThreadPoolExecutor;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
@ -45,8 +47,9 @@ public class ThreadPoolRunListener {
Map<String, DynamicThreadPoolWrap> executorMap = ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class); Map<String, DynamicThreadPoolWrap> executorMap = ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
executorMap.forEach((key, val) -> { executorMap.forEach((key, val) -> {
String tpId = val.getTpId();
Map<String, String> queryStrMap = new HashMap(3); Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put("tpId", val.getTpId()); queryStrMap.put("tpId", tpId);
queryStrMap.put("itemId", properties.getItemId()); queryStrMap.put("itemId", properties.getItemId());
queryStrMap.put("namespace", properties.getNamespace()); queryStrMap.put("namespace", properties.getNamespace());
@ -56,18 +59,25 @@ public class ThreadPoolRunListener {
try { try {
result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池 // 使用相关参数创建线程池
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
ThreadPoolExecutor resultTpe = new ThreadPoolExecutor(ppi.getCoreSize(), ppi.getMaxSize(), ppi.getKeepAliveTime(), unit, workQueue); ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.builder()
val.setPool(resultTpe); .isCustomPool(true)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS)
.workQueue(workQueue)
.threadFactory(tpId)
.rejected(new CustomThreadPoolExecutor.AbortPolicy())
.build();
val.setPool(poolExecutor);
} else if (val.getPool() == null) { } else if (val.getPool() == null) {
val.setPool(CommonThreadPool.getInstance(val.getTpId())); val.setPool(CommonThreadPool.getInstance(tpId));
} }
} catch (Exception ex) { } catch (Exception ex) {
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
val.setPool(CommonThreadPool.getInstance(val.getTpId())); val.setPool(CommonThreadPool.getInstance(tpId));
} }
GlobalThreadPoolManage.register(val.getTpId(), ppi, val); GlobalThreadPoolManage.register(val.getTpId(), ppi, val);

@ -1,6 +1,7 @@
package io.dynamic.threadpool.starter.toolkit.thread; package io.dynamic.threadpool.starter.toolkit.thread;
import io.dynamic.threadpool.common.toolkit.Assert; import io.dynamic.threadpool.common.toolkit.Assert;
import io.dynamic.threadpool.starter.wrap.CustomThreadPoolExecutor;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -86,6 +87,26 @@ public class AbstractBuildThreadPoolTemplate {
return fastThreadPoolExecutor; return fastThreadPoolExecutor;
} }
/**
* 线
*
* @param initParam
* @return
*/
public static CustomThreadPoolExecutor buildCustomPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
CustomThreadPoolExecutor executorService =
new CustomThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWorkQueue(),
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
return executorService;
}
@Data @Data
@Accessors(chain = true) @Accessors(chain = true)
public static class ThreadPoolInitParam { public static class ThreadPoolInitParam {
@ -131,9 +152,9 @@ public class AbstractBuildThreadPoolTemplate {
private ThreadFactory threadFactory; private ThreadFactory threadFactory;
public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) {
this.threadFactory = new ThreadFactoryBuilder() this.threadFactory = ThreadFactoryBuilder.builder()
.setNamePrefix(threadNamePrefix + "-") .prefix(threadNamePrefix)
.setDaemon(isDaemon) .daemon(isDaemon)
.build(); .build();
} }
} }

@ -47,7 +47,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
* @param backingThreadFactory 线线 * @param backingThreadFactory 线线
* @return this * @return this
*/ */
public ThreadFactoryBuilder setThreadFactory(ThreadFactory backingThreadFactory) { public ThreadFactoryBuilder threadFactory(ThreadFactory backingThreadFactory) {
this.backingThreadFactory = backingThreadFactory; this.backingThreadFactory = backingThreadFactory;
return this; return this;
} }
@ -58,7 +58,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
* @param namePrefix 线 * @param namePrefix 线
* @return this * @return this
*/ */
public ThreadFactoryBuilder setNamePrefix(String namePrefix) { public ThreadFactoryBuilder prefix(String namePrefix) {
this.namePrefix = namePrefix; this.namePrefix = namePrefix;
return this; return this;
} }
@ -69,7 +69,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
* @param daemon 线 * @param daemon 线
* @return this * @return this
*/ */
public ThreadFactoryBuilder setDaemon(boolean daemon) { public ThreadFactoryBuilder daemon(boolean daemon) {
this.daemon = daemon; this.daemon = daemon;
return this; return this;
} }
@ -83,7 +83,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
* @see Thread#NORM_PRIORITY * @see Thread#NORM_PRIORITY
* @see Thread#MAX_PRIORITY * @see Thread#MAX_PRIORITY
*/ */
public ThreadFactoryBuilder setPriority(int priority) { public ThreadFactoryBuilder priority(int priority) {
if (priority < Thread.MIN_PRIORITY) { if (priority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException(String.format("Thread priority ({}) must be >= {}", priority, Thread.MIN_PRIORITY)); throw new IllegalArgumentException(String.format("Thread priority ({}) must be >= {}", priority, Thread.MIN_PRIORITY));
} }
@ -99,10 +99,19 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
* *
* @param uncaughtExceptionHandler {@link Thread.UncaughtExceptionHandler} * @param uncaughtExceptionHandler {@link Thread.UncaughtExceptionHandler}
*/ */
public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { public void uncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
this.uncaughtExceptionHandler = uncaughtExceptionHandler; this.uncaughtExceptionHandler = uncaughtExceptionHandler;
} }
/**
*
*
* @return
*/
public static ThreadFactoryBuilder builder() {
return new ThreadFactoryBuilder();
}
/** /**
* {@link ThreadFactory} * {@link ThreadFactory}
* *
@ -131,7 +140,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
return r -> { return r -> {
final Thread thread = backingThreadFactory.newThread(r); final Thread thread = backingThreadFactory.newThread(r);
if (null != namePrefix) { if (null != namePrefix) {
thread.setName(namePrefix + count.getAndIncrement()); thread.setName(namePrefix + "-" + count.getAndIncrement());
} }
if (null != daemon) { if (null != daemon) {
thread.setDaemon(daemon); thread.setDaemon(daemon);

@ -22,20 +22,25 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/ */
private boolean isFastPool; private boolean isFastPool;
/**
* 线
*/
private boolean isCustomPool;
/** /**
* 线 * 线
*/ */
private Integer corePoolNum = calculateCoreNum(); private int corePoolSize = calculateCoreNum();
/** /**
* 线 * 线
*/ */
private Integer maxPoolNum = corePoolNum + (corePoolNum >> 1); private int maxPoolSize = corePoolSize + (corePoolSize >> 1);
/** /**
* 线 * 线
*/ */
private Long keepAliveTime = 30000L; private long keepAliveTime = 30000L;
/** /**
* 线 * 线
@ -45,7 +50,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
/** /**
* *
*/ */
private Integer capacity = 512; private int capacity = 512;
/** /**
* *
@ -87,6 +92,11 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder isCustomPool(Boolean isCustomPool) {
this.isCustomPool = isCustomPool;
return this;
}
public ThreadPoolBuilder threadFactory(String threadNamePrefix) { public ThreadPoolBuilder threadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix; this.threadNamePrefix = threadNamePrefix;
return this; return this;
@ -98,23 +108,23 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder corePoolNum(Integer corePoolNum) { public ThreadPoolBuilder corePoolSize(int corePoolSize) {
this.corePoolNum = corePoolNum; this.corePoolSize = corePoolSize;
return this; return this;
} }
public ThreadPoolBuilder maxPoolNum(Integer maxPoolNum) { public ThreadPoolBuilder maxPoolNum(int maxPoolSize) {
this.maxPoolNum = maxPoolNum; this.maxPoolSize = maxPoolSize;
return this; return this;
} }
public ThreadPoolBuilder poolThreadNum(Integer corePoolNum, Integer maxPoolNum) { public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) {
this.corePoolNum = corePoolNum; this.corePoolSize = corePoolSize;
this.maxPoolNum = maxPoolNum; this.maxPoolSize = maxPoolSize;
return this; return this;
} }
public ThreadPoolBuilder keepAliveTime(Long keepAliveTime) { public ThreadPoolBuilder keepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime; this.keepAliveTime = keepAliveTime;
return this; return this;
} }
@ -124,18 +134,18 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder keepAliveTime(Long keepAliveTime, TimeUnit timeUnit) { public ThreadPoolBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
this.keepAliveTime = keepAliveTime; this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit; this.timeUnit = timeUnit;
return this; return this;
} }
public ThreadPoolBuilder capacity(Integer capacity) { public ThreadPoolBuilder capacity(int capacity) {
this.capacity = capacity; this.capacity = capacity;
return this; return this;
} }
public ThreadPoolBuilder workQueue(QueueTypeEnum queueType, Integer capacity) { public ThreadPoolBuilder workQueue(QueueTypeEnum queueType, int capacity) {
this.queueType = queueType; this.queueType = queueType;
this.capacity = capacity; this.capacity = capacity;
return this; return this;
@ -146,17 +156,16 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
/**
* 使 workQueue, capacity
*
* @param queueType
* @return
*/
public ThreadPoolBuilder workQueue(QueueTypeEnum queueType) { public ThreadPoolBuilder workQueue(QueueTypeEnum queueType) {
this.queueType = queueType; this.queueType = queueType;
return this; return this;
} }
public ThreadPoolBuilder workQueue(BlockingQueue workQueue) {
this.workQueue = workQueue;
return this;
}
/** /**
* *
* *
@ -164,6 +173,9 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/ */
@Override @Override
public ThreadPoolExecutor build() { public ThreadPoolExecutor build() {
if (isCustomPool) {
return buildCustomPool(this);
}
return isFastPool ? buildFastPool(this) : buildPool(this); return isFastPool ? buildFastPool(this) : buildPool(this);
} }
@ -196,6 +208,16 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder)); return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder));
} }
/**
* 线
*
* @param builder
* @return
*/
private static ThreadPoolExecutor buildCustomPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildCustomPool(buildInitParam(builder));
}
/** /**
* *
* *
@ -207,8 +229,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam =
new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon);
initParam.setCorePoolNum(builder.corePoolNum) initParam.setCorePoolNum(builder.corePoolSize)
.setMaxPoolNum(builder.maxPoolNum) .setMaxPoolNum(builder.maxPoolSize)
.setKeepAliveTime(builder.keepAliveTime) .setKeepAliveTime(builder.keepAliveTime)
.setCapacity(builder.capacity) .setCapacity(builder.capacity)
.setRejectedExecutionHandler(builder.rejectedExecutionHandler) .setRejectedExecutionHandler(builder.rejectedExecutionHandler)
@ -216,11 +238,10 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
// 快速消费线程池内置指定线程池 // 快速消费线程池内置指定线程池
if (!builder.isFastPool) { if (!builder.isFastPool) {
BlockingQueue blockingQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity); if (builder.workQueue == null) {
if (blockingQueue == null) { builder.workQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity);
blockingQueue = builder.workQueue;
} }
initParam.setWorkQueue(blockingQueue); initParam.setWorkQueue(builder.workQueue);
} }
return initParam; return initParam;

Loading…
Cancel
Save