feat: 功能持续更新.

pull/161/head
chen.ma 4 years ago
parent 847f87f790
commit 5383861e2f

@ -0,0 +1,20 @@
package io.dynamic.threadpool.starter.builder;
import java.io.Serializable;
/**
*
*
* @author chen.ma
* @date 2021/7/5 21:39
*/
public interface Builder<T> extends Serializable {
/**
*
*
* @return
*/
T build();
}

@ -0,0 +1,210 @@
package io.dynamic.threadpool.starter.builder;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.common.toolkit.Assert;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.toolkit.thread.AbstractBuildThreadPoolTemplate;
import java.math.BigDecimal;
import java.util.concurrent.*;
/**
* 线
*
* @author chen.ma
* @date 2021/6/28 17:29
*/
public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
/**
* 线
*/
private boolean isFastPool;
/**
* 线
*/
private Integer corePoolNum = calculateCoreNum();
/**
* 线
*/
private Integer maxPoolNum = corePoolNum + (corePoolNum >> 1);
/**
* 线
*/
private Long keepAliveTime = 30000L;
/**
* 线
*/
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
/**
*
*/
private Integer capacity = 512;
/**
*
*/
private QueueTypeEnum queueType;
/**
*
*/
private BlockingQueue workQueue = new LinkedBlockingQueue(capacity);
/**
* 线
*/
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
/**
* 线
*/
private boolean isDaemon = false;
/**
* 线
*/
private String threadNamePrefix;
/**
* CPU / (1 - 0.8)
*
* @return 线线
*/
private Integer calculateCoreNum() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue();
}
public ThreadPoolBuilder isFastPool(Boolean isFastPool) {
this.isFastPool = isFastPool;
return this;
}
public ThreadPoolBuilder threadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) {
this.threadNamePrefix = threadNamePrefix;
this.isDaemon = isDaemon;
return this;
}
public ThreadPoolBuilder corePoolNum(Integer corePoolNum) {
this.corePoolNum = corePoolNum;
return this;
}
public ThreadPoolBuilder maxPoolNum(Integer maxPoolNum) {
this.maxPoolNum = maxPoolNum;
return this;
}
public ThreadPoolBuilder keepAliveTime(Long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
return this;
}
public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
public ThreadPoolBuilder capacity(Integer capacity) {
this.capacity = capacity;
return this;
}
public ThreadPoolBuilder rejected(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
/**
* 使 workQueue, capacity
*
* @param queueType
* @return
*/
public ThreadPoolBuilder workQueue(QueueTypeEnum queueType) {
this.queueType = queueType;
return this;
}
/**
*
*
* @return
*/
@Override
public ThreadPoolExecutor build() {
return isFastPool ? buildFastPool(this) : buildPool(this);
}
/**
*
*
* @return
*/
public static ThreadPoolBuilder builder() {
return new ThreadPoolBuilder();
}
/**
* 线
*
* @param builder
* @return
*/
public static ThreadPoolExecutor buildPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder));
}
/**
* 线
*
* @param builder
* @return
*/
public static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder));
}
/**
*
*
* @param builder
* @return
*/
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
Assert.notEmpty(builder.threadNamePrefix, "线程名称前缀不可为空或空的字符串.");
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam =
new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon);
initParam.setCorePoolNum(builder.corePoolNum)
.setMaxPoolNum(builder.maxPoolNum)
.setKeepAliveTime(builder.keepAliveTime)
.setCapacity(builder.capacity)
.setRejectedExecutionHandler(builder.rejectedExecutionHandler)
.setTimeUnit(builder.timeUnit);
// 快速消费线程池内置指定线程池
if (!builder.isFastPool) {
BlockingQueue blockingQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity);
if (blockingQueue == null) {
blockingQueue = builder.workQueue;
}
initParam.setWorkQueue(blockingQueue);
}
return initParam;
}
}

@ -42,5 +42,5 @@ public class DynamicThreadPoolProperties {
/**
* banner
*/
private boolean banner;
private boolean banner = true;
}

@ -2,7 +2,7 @@ package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.toolkit.ThreadPoolChangeUtil;
import io.dynamic.threadpool.starter.handler.ThreadPoolChangeHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadPoolExecutor;
@ -32,7 +32,7 @@ public class ThreadPoolDynamicRefresh {
log.info("[✈️] Original thread pool. coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}",
executor.getCorePoolSize(), executor.getMaximumPoolSize(), queueType, executor.getQueue().remainingCapacity(), executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
ThreadPoolChangeUtil.changePool(executor, coreSize, maxSize, queueType, capacity, keepAliveTime);
ThreadPoolChangeHandler.changePool(executor, coreSize, maxSize, queueType, capacity, keepAliveTime);
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool();
log.info("[🚀] Changed thread pool. coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}",
afterExecutor.getCorePoolSize(), afterExecutor.getMaximumPoolSize(), queueType, afterExecutor.getQueue().remainingCapacity(), afterExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS));

@ -1,4 +1,4 @@
package io.dynamic.threadpool.starter.toolkit;
package io.dynamic.threadpool.starter.handler;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
* @date 2021/6/25 17:19
*/
@Slf4j
public class ThreadPoolChangeUtil {
public class ThreadPoolChangeHandler {
public static void changePool(ThreadPoolExecutor executor, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime) {
if (coreSize != null) {

@ -0,0 +1,41 @@
package io.dynamic.threadpool.starter.toolkit;
import java.lang.reflect.Array;
/**
* Array Util.
*
* @author chen.ma
* @date 2021/7/5 21:54
*/
public class ArrayUtil {
public static <T> T[] addAll(final T[] array1, @SuppressWarnings("unchecked") final T... array2) {
if (array1 == null) {
return clone(array2);
} else if (array2 == null) {
return clone(array1);
}
final Class<?> type1 = array1.getClass().getComponentType();
@SuppressWarnings("unchecked") final T[] joinedArray = (T[]) Array.newInstance(type1, array1.length + array2.length);
System.arraycopy(array1, 0, joinedArray, 0, array1.length);
try {
System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
} catch (final ArrayStoreException ase) {
final Class<?> type2 = array2.getClass().getComponentType();
if (!type1.isAssignableFrom(type2)) {
throw new IllegalArgumentException("Cannot store " + type2.getName() + " in an array of "
+ type1.getName(), ase);
}
throw ase;
}
return joinedArray;
}
public static <T> T[] clone(final T[] array) {
if (array == null) {
return null;
}
return array.clone();
}
}

@ -0,0 +1,140 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import io.dynamic.threadpool.common.toolkit.Assert;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 线
*
* @author chen.ma
* @date 2021/7/5 21:45
*/
@Slf4j
public class AbstractBuildThreadPoolTemplate {
/**
* 线
* <p>
* , , abstract
* {@link AbstractQueuedSynchronizer#tryAcquire}
*
* @return
*/
protected static ThreadPoolInitParam initParam() {
throw new UnsupportedOperationException();
}
/**
* 线
*
* @return
*/
public static ThreadPoolExecutor buildPool() {
ThreadPoolInitParam initParam = initParam();
return buildPool(initParam);
}
/**
* 线
*
* @return
*/
public static ThreadPoolExecutor buildPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
ThreadPoolExecutor executorService =
new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWorkQueue(),
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
return executorService;
}
/**
* 线
*
* @return
*/
public static ThreadPoolExecutor buildFastPool() {
ThreadPoolInitParam initParam = initParam();
return buildFastPool(initParam);
}
/**
* 线
*
* @return
*/
public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) {
TaskQueue<Runnable> taskQueue = new TaskQueue(initParam.getCapacity());
FastThreadPoolExecutor fastThreadPoolExecutor =
new FastThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
taskQueue,
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
taskQueue.setExecutor(fastThreadPoolExecutor);
return fastThreadPoolExecutor;
}
@Data
@Accessors(chain = true)
public static class ThreadPoolInitParam {
/**
* 线
*/
private Integer corePoolNum;
/**
* 线
*/
private Integer maxPoolNum;
/**
* 线
*/
private Long keepAliveTime;
/**
* 线
*/
private TimeUnit timeUnit;
/**
*
*/
private Integer capacity;
/**
*
*/
private BlockingQueue<Runnable> workQueue;
/**
* 线
*/
private RejectedExecutionHandler rejectedExecutionHandler;
/**
* 线
*/
private ThreadFactory threadFactory;
public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) {
this.threadFactory = new ThreadFactoryBuilder()
.setNamePrefix(threadNamePrefix + "-")
.setDaemon(isDaemon)
.build();
}
}
}

@ -0,0 +1,65 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线, Dubbo 线 EagerThreadPoolExecutor
* <p>
* {@link TaskQueue}
*
* @author chen.ma
* @date 2021/7/5 21:00
*/
@Slf4j
public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
public FastThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("队列容量已满.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Exception t) {
submittedTaskCount.decrementAndGet();
throw t;
}
}
}

@ -0,0 +1,56 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
/**
* 线
*
* @author chen.ma
* @date 2021/7/5 21:23
*/
@Slf4j
public class RejectedPolicies {
/**
* ,
*
* @return
*/
public static RejectedExecutionHandler runsOldestTaskPolicy() {
return (r, executor) -> {
if (executor.isShutdown()) {
return;
}
BlockingQueue<Runnable> workQueue = executor.getQueue();
Runnable firstWork = workQueue.poll();
boolean newTaskAdd = workQueue.offer(r);
if (firstWork != null) {
firstWork.run();
}
if (!newTaskAdd) {
executor.execute(r);
}
};
}
/**
* 使,
*
* @return
*/
public static RejectedExecutionHandler syncPutQueuePolicy() {
return (r, executor) -> {
if (executor.isShutdown()) {
return;
}
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.error("线程池添加队列任务失败", e);
}
};
}
}

@ -0,0 +1,52 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* , Dubbo TaskQueue
* <p>
* {@link FastThreadPoolExecutor} 使
*
* @author chen.ma
* @date 2021/7/5 21:00
*/
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private FastThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(FastThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
int currentPoolThreadSize = executor.getPoolSize();
// 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 当前线程池线程数量小于最大线程数, 返回false, 根据线程池源码, 会创建非核心线程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 如果当前线程池数量大于最大线程数, 任务加入阻塞队列
return super.offer(runnable);
}
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("执行器已关闭!");
}
return super.offer(o, timeout, unit);
}
}

@ -0,0 +1,148 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import io.dynamic.threadpool.starter.builder.Builder;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
/**
* ThreadFactory
*
* @author chen.ma
* @date 2021/7/5 21:53
*/
public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
private static final long serialVersionUID = 1L;
/**
* 线线
*/
private ThreadFactory backingThreadFactory;
/**
* 线
*/
private String namePrefix;
/**
* 线false
*/
private Boolean daemon;
/**
* 线
*/
private Integer priority;
/**
*
*/
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/**
* 线线
*
* @param backingThreadFactory 线线
* @return this
*/
public ThreadFactoryBuilder setThreadFactory(ThreadFactory backingThreadFactory) {
this.backingThreadFactory = backingThreadFactory;
return this;
}
/**
* 线, mb-thread- 线 mb-thread-1
*
* @param namePrefix 线
* @return this
*/
public ThreadFactoryBuilder setNamePrefix(String namePrefix) {
this.namePrefix = namePrefix;
return this;
}
/**
* 线
*
* @param daemon 线
* @return this
*/
public ThreadFactoryBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}
/**
* 线
*
* @param priority
* @return this
* @see Thread#MIN_PRIORITY
* @see Thread#NORM_PRIORITY
* @see Thread#MAX_PRIORITY
*/
public ThreadFactoryBuilder setPriority(int priority) {
if (priority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException(String.format("Thread priority ({}) must be >= {}", priority, Thread.MIN_PRIORITY));
}
if (priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(String.format("Thread priority ({}) must be <= {}", priority, Thread.MAX_PRIORITY));
}
this.priority = priority;
return this;
}
/**
*
*
* @param uncaughtExceptionHandler {@link Thread.UncaughtExceptionHandler}
*/
public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
}
/**
* {@link ThreadFactory}
*
* @return {@link ThreadFactory}
*/
@Override
public ThreadFactory build() {
return build(this);
}
/**
*
*
* @param builder {@link ThreadFactoryBuilder}
* @return {@link ThreadFactory}
*/
private static ThreadFactory build(ThreadFactoryBuilder builder) {
final ThreadFactory backingThreadFactory = (null != builder.backingThreadFactory)
? builder.backingThreadFactory
: Executors.defaultThreadFactory();
final String namePrefix = builder.namePrefix;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final Thread.UncaughtExceptionHandler handler = builder.uncaughtExceptionHandler;
final AtomicLong count = (null == namePrefix) ? null : new AtomicLong();
return r -> {
final Thread thread = backingThreadFactory.newThread(r);
if (null != namePrefix) {
thread.setName(namePrefix + count.getAndIncrement());
}
if (null != daemon) {
thread.setDaemon(daemon);
}
if (null != priority) {
thread.setPriority(priority);
}
if (null != handler) {
thread.setUncaughtExceptionHandler(handler);
}
return thread;
};
}
}

@ -0,0 +1,65 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import io.dynamic.threadpool.starter.toolkit.ArrayUtil;
import java.util.concurrent.*;
/**
* 线
*
* @author chen.ma
* @date 2021/7/5 21:59
*/
public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
public ThreadPoolExecutorTemplate(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private Exception clientTrace() {
return new Exception("tread task root stack trace");
}
@Override
public void execute(final Runnable command) {
super.execute(wrap(command, clientTrace()));
}
@Override
public Future<?> submit(final Runnable task) {
return super.submit(wrap(task, clientTrace()));
}
@Override
public <T> Future<T> submit(final Callable<T> task) {
return super.submit(wrap(task, clientTrace()));
}
private Runnable wrap(final Runnable task, final Exception clientStack) {
return () -> {
try {
task.run();
} catch (Exception e) {
e.setStackTrace(ArrayUtil.addAll(clientStack.getStackTrace(), e.getStackTrace()));
throw e;
}
};
}
private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack) {
return () -> {
try {
return task.call();
} catch (Exception e) {
e.setStackTrace(ArrayUtil.addAll(clientStack.getStackTrace(), e.getStackTrace()));
throw e;
}
};
}
}

@ -9,7 +9,15 @@
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>🔥 强大的动态线程池,附带监控报警功能</description>
<url>https://github.com/longtai94/dynamic-thread-pool</url>
<description>🔥 强大的动态线程池,没有依赖任何中间件(未来也不打算依赖),附带监控线程池功能</description>
<developers>
<developer>
<name>chen.ma</name>
<url>https://github.com/longtai94</url>
</developer>
</developers>
<properties>
<java.version>1.8</java.version>

Loading…
Cancel
Save