Supplemental code comments (#898)

pull/899/head
马称 Ma Chen 2 years ago committed by GitHub
parent d02e989c54
commit 4831cd6ecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,7 +24,30 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
/** /**
* Dynamic thread pool. * An annotation that enhances the functionality of the jdk acoustic thread pool,
* with the following list of enhancements.
*
* <ul>
* <li>Dynamic change at runtime</li>
* <li>Determine whether an alarm is required at runtime</li>
* <li>Provide observable monitoring indicators</li>
* <li>......</li>
* </ur>
*
* <p>If you use Server mode, you can view the thread pool operation in the built-in console.
* <p>If you use Config mode, you can observe with Prometheus and Grafana.
*
* <p>The annotation is normally marked on the
* spring bean defined by {@link java.util.concurrent.ThreadPoolExecutor}.
*
* <p>Can also be marked on the following types:
*
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ExecutorService
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @see com.alibaba.ttl.threadpool.ExecutorTtlWrapper
* @see com.alibaba.ttl.threadpool.ExecutorServiceTtlWrapper
* @since 1.0
*/ */
@Target({ElementType.METHOD, ElementType.TYPE}) @Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)

@ -41,8 +41,16 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
} }
/**
* Statistics on the number of tasks submitted by the fast consumption thread pool
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0); private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
/**
* Get submitted task count.
*
* @return submitted task count
*/
public int getSubmittedTaskCount() { public int getSubmittedTaskCount() {
return submittedTaskCount.get(); return submittedTaskCount.get();
} }

@ -17,6 +17,8 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import lombok.Setter;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -28,16 +30,13 @@ public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable>
private static final long serialVersionUID = -2635853580887179627L; private static final long serialVersionUID = -2635853580887179627L;
@Setter
private FastThreadPoolExecutor executor; private FastThreadPoolExecutor executor;
public TaskQueue(int capacity) { public TaskQueue(int capacity) {
super(capacity); super(capacity);
} }
public void setExecutor(FastThreadPoolExecutor exec) {
executor = exec;
}
@Override @Override
public boolean offer(Runnable runnable) { public boolean offer(Runnable runnable) {
int currentPoolThreadSize = executor.getPoolSize(); int currentPoolThreadSize = executor.getPoolSize();
@ -54,10 +53,21 @@ public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable>
return super.offer(runnable); return super.offer(runnable);
} }
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { /**
* Retry offer.
*
* @param runnable submit thread pool task
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return
* @throws InterruptedException
*/
public boolean retryOffer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) { if (executor.isShutdown()) {
throw new RejectedExecutionException("Actuator closed!"); throw new RejectedExecutionException("Actuator closed!");
} }
return super.offer(o, timeout, unit); return super.offer(runnable, timeout, unit);
} }
} }

@ -17,6 +17,11 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -25,12 +30,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator;
/** /**
* Thread-pool builder. * Thread-pool builder.
*/ */
@ -74,42 +73,89 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
private Boolean allowCoreThreadTimeOut = false; private Boolean allowCoreThreadTimeOut = false;
/**
* Calculate core num.
*
* @return core num
*/
private Integer calculateCoreNum() { private Integer calculateCoreNum() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors(); int cpuCoreNum = Runtime.getRuntime().availableProcessors();
return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue(); return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue();
} }
/**
* Is fast pool.
*
* @param isFastPool is fast pool
* @return thread-pool builder
*/
public ThreadPoolBuilder isFastPool(Boolean isFastPool) { public ThreadPoolBuilder isFastPool(Boolean isFastPool) {
this.isFastPool = isFastPool; this.isFastPool = isFastPool;
return this; return this;
} }
/**
* Dynamic pool.
*
* @return thread-pool builder
*/
public ThreadPoolBuilder dynamicPool() { public ThreadPoolBuilder dynamicPool() {
this.isDynamicPool = true; this.isDynamicPool = true;
return this; return this;
} }
/**
* Thread factory.
*
* @param threadNamePrefix thread name prefix
* @return thread-pool builder
*/
public ThreadPoolBuilder threadFactory(String threadNamePrefix) { public ThreadPoolBuilder threadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix; this.threadNamePrefix = threadNamePrefix;
return this; return this;
} }
/**
* Thread factory.
*
* @param threadFactory thread factory
* @return thread-pool builder
*/
public ThreadPoolBuilder threadFactory(ThreadFactory threadFactory) { public ThreadPoolBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory; this.threadFactory = threadFactory;
return this; return this;
} }
/**
* Thread factory.
*
* @param threadNamePrefix thread name prefix
* @param isDaemon is daemon
* @return thread-pool builder
*/
public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) { public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) {
this.threadNamePrefix = threadNamePrefix; this.threadNamePrefix = threadNamePrefix;
this.isDaemon = isDaemon; this.isDaemon = isDaemon;
return this; return this;
} }
/**
* Core pool size.
*
* @param corePoolSize core pool size
* @return thread-pool builder
*/
public ThreadPoolBuilder corePoolSize(int corePoolSize) { public ThreadPoolBuilder corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize; this.corePoolSize = corePoolSize;
return this; return this;
} }
/**
* Max pool num.
*
* @param maxPoolSize max pool num
* @return thread-pool builder
*/
public ThreadPoolBuilder maxPoolNum(int maxPoolSize) { public ThreadPoolBuilder maxPoolNum(int maxPoolSize) {
this.maxPoolSize = maxPoolSize; this.maxPoolSize = maxPoolSize;
if (maxPoolSize < this.corePoolSize) { if (maxPoolSize < this.corePoolSize) {
@ -118,6 +164,11 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
/**
* Single pool.
*
* @return thread-pool builder
*/
public ThreadPoolBuilder singlePool() { public ThreadPoolBuilder singlePool() {
int singleNum = 1; int singleNum = 1;
this.corePoolSize = singleNum; this.corePoolSize = singleNum;
@ -125,6 +176,12 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
/**
* Single pool.
*
* @param threadNamePrefix thread name prefix
* @return thread-pool builder
*/
public ThreadPoolBuilder singlePool(String threadNamePrefix) { public ThreadPoolBuilder singlePool(String threadNamePrefix) {
int singleNum = 1; int singleNum = 1;
this.corePoolSize = singleNum; this.corePoolSize = singleNum;
@ -133,128 +190,245 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
/**
* Pool thread size.
*
* @param corePoolSize core pool size
* @param maxPoolSize max pool size
* @return thread-pool builder
*/
public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) { public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) {
this.corePoolSize = corePoolSize; this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize; this.maxPoolSize = maxPoolSize;
return this; return this;
} }
/**
* Keep alive time.
*
* @param keepAliveTime keep alive time
* @return thread-pool builder
*/
public ThreadPoolBuilder keepAliveTime(long keepAliveTime) { public ThreadPoolBuilder keepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime; this.keepAliveTime = keepAliveTime;
return this; return this;
} }
/**
* Time unit.
*
* @param timeUnit time unit
* @return thread-pool builder
*/
public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) { public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit; this.timeUnit = timeUnit;
return this; return this;
} }
/**
* Execute time-out.
*
* @param executeTimeOut execute time-out
* @return thread-pool builder
*/
public ThreadPoolBuilder executeTimeOut(long executeTimeOut) { public ThreadPoolBuilder executeTimeOut(long executeTimeOut) {
this.executeTimeOut = executeTimeOut; this.executeTimeOut = executeTimeOut;
return this; return this;
} }
/**
* Keep alive time.
*
* @param keepAliveTime keep alive time
* @param timeUnit time unit
* @return thread-pool builder
*/
public ThreadPoolBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) { public ThreadPoolBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
this.keepAliveTime = keepAliveTime; this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit; this.timeUnit = timeUnit;
return this; return this;
} }
/**
* Capacity.
*
* @param capacity capacity
* @return thread-pool builder
*/
public ThreadPoolBuilder capacity(int capacity) { public ThreadPoolBuilder capacity(int capacity) {
this.capacity = capacity; this.capacity = capacity;
return this; return this;
} }
/**
* Work queue.
*
* @param queueType queue type
* @param capacity capacity
* @return thread-pool builder
*/
public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum queueType, int capacity) { public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum queueType, int capacity) {
this.blockingQueueType = queueType; this.blockingQueueType = queueType;
this.capacity = capacity; this.capacity = capacity;
return this; return this;
} }
/**
* Rejected.
*
* @param rejectedExecutionHandler rejected execution handler
* @return thread-pool builder
*/
public ThreadPoolBuilder rejected(RejectedExecutionHandler rejectedExecutionHandler) { public ThreadPoolBuilder rejected(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler; this.rejectedExecutionHandler = rejectedExecutionHandler;
return this; return this;
} }
/**
* Work queue.
*
* @param blockingQueueType blocking queue type
* @return thread-pool builder
*/
public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum blockingQueueType) { public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum blockingQueueType) {
this.blockingQueueType = blockingQueueType; this.blockingQueueType = blockingQueueType;
return this; return this;
} }
/**
* Work queue.
*
* @param workQueue work queue
* @return thread-pool builder
*/
public ThreadPoolBuilder workQueue(BlockingQueue workQueue) { public ThreadPoolBuilder workQueue(BlockingQueue workQueue) {
this.workQueue = workQueue; this.workQueue = workQueue;
return this; return this;
} }
/**
* Thread-pool id.
*
* @param threadPoolId thread-pool id
* @return thread-pool builder
*/
public ThreadPoolBuilder threadPoolId(String threadPoolId) { public ThreadPoolBuilder threadPoolId(String threadPoolId) {
this.threadPoolId = threadPoolId; this.threadPoolId = threadPoolId;
return this; return this;
} }
/**
* Task decorator.
*
* @param taskDecorator task decorator
* @return thread-pool builder
*/
public ThreadPoolBuilder taskDecorator(TaskDecorator taskDecorator) { public ThreadPoolBuilder taskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator; this.taskDecorator = taskDecorator;
return this; return this;
} }
/**
* Await termination millis.
*
* @param awaitTerminationMillis await termination millis
* @return thread-pool builder
*/
public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) { public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis; this.awaitTerminationMillis = awaitTerminationMillis;
return this; return this;
} }
/**
* Wait for tasks to complete on shutdown.
*
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @return thread-pool builder
*/
public ThreadPoolBuilder waitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) { public ThreadPoolBuilder waitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this; return this;
} }
/**
* Dynamic support.
*
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @return thread-pool builder
*/
public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis) { public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis; this.awaitTerminationMillis = awaitTerminationMillis;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this; return this;
} }
/**
* Allow core thread time-out.
*
* @param allowCoreThreadTimeOut core thread time-out
* @return thread-pool builder
*/
public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
return this; return this;
} }
@Override /**
public ThreadPoolExecutor build() { * Builder design pattern implementation.
if (isDynamicPool) { *
return buildDynamicPool(this); * @return thread-pool builder
} */
return isFastPool ? buildFastPool(this) : buildPool(this);
}
public static ThreadPoolBuilder builder() { public static ThreadPoolBuilder builder() {
return new ThreadPoolBuilder(); return new ThreadPoolBuilder();
} }
/** /**
* Create dynamic thread pool by thread pool id * Create dynamic thread pool by thread pool id.
* *
* @param threadPoolId threadPoolId * @param threadPoolId thread-pool id
* @return ThreadPoolExecutor * @return dynamic thread-pool executor
*/ */
public static ThreadPoolExecutor buildDynamicPoolById(String threadPoolId) { public static ThreadPoolExecutor buildDynamicPoolById(String threadPoolId) {
return ThreadPoolBuilder.builder() return ThreadPoolBuilder.builder().threadFactory(threadPoolId).threadPoolId(threadPoolId).dynamicPool().build();
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
} }
/**
* Build a normal thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return normal thread-pool
*/
private static ThreadPoolExecutor buildPool(ThreadPoolBuilder builder) { private static ThreadPoolExecutor buildPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder)); return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder));
} }
/**
* Build a fast thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return fast thread-pool executor
*/
private static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) { private static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder)); return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder));
} }
/**
* Build a dynamic thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return dynamic thread-pool executor
*/
private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) { private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder)); return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder));
} }
/**
* Build thread-pool initialization parameters via {@code builder}.
*
* @param builder thread-pool builder
* @return thread-pool init param
*/
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam; AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam;
if (builder.threadFactory == null) { if (builder.threadFactory == null) {
@ -289,4 +463,12 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
} }
return initParam; return initParam;
} }
@Override
public ThreadPoolExecutor build() {
if (isDynamicPool) {
return buildDynamicPool(this);
}
return isFastPool ? buildFastPool(this) : buildPool(this);
}
} }

@ -36,10 +36,6 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
} }
private Exception clientTrace() {
return new Exception("Tread task root stack trace.");
}
@Override @Override
public void execute(final Runnable command) { public void execute(final Runnable command) {
super.execute(wrap(command, clientTrace())); super.execute(wrap(command, clientTrace()));
@ -55,6 +51,22 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
return super.submit(wrap(task, clientTrace())); return super.submit(wrap(task, clientTrace()));
} }
/**
* Client trace.
*
* @return exception
*/
private Exception clientTrace() {
return new Exception("Tread task root stack trace.");
}
/**
* Wrapping thread pool tasks.
*
* @param task task
* @param clientStack client stack
* @return wrapped runnable
*/
private Runnable wrap(final Runnable task, final Exception clientStack) { private Runnable wrap(final Runnable task, final Exception clientStack) {
return () -> { return () -> {
try { try {
@ -66,6 +78,14 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
}; };
} }
/**
* Wrapping thread pool tasks.
*
* @param task task
* @param clientStack client stack
* @param <T> computed result
* @return wrapped runnable
*/
private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack) { private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack) {
return () -> { return () -> {
try { try {

Loading…
Cancel
Save