diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java
index b9a5e58f..eb6632ff 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java
@@ -24,7 +24,30 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Dynamic thread pool.
+ * An annotation that enhances the functionality of the jdk acoustic thread pool,
+ * with the following list of enhancements.
+ *
+ *
+ * - Dynamic change at runtime
+ * - Determine whether an alarm is required at runtime
+ * - Provide observable monitoring indicators
+ * - ......
+ *
+ *
+ * If you use Server mode, you can view the thread pool operation in the built-in console.
+ *
If you use Config mode, you can observe with Prometheus and Grafana.
+ *
+ *
The annotation is normally marked on the
+ * spring bean defined by {@link java.util.concurrent.ThreadPoolExecutor}.
+ *
+ *
Can also be marked on the following types:
+ *
+ * @see java.util.concurrent.Executor
+ * @see java.util.concurrent.ExecutorService
+ * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
+ * @see com.alibaba.ttl.threadpool.ExecutorTtlWrapper
+ * @see com.alibaba.ttl.threadpool.ExecutorServiceTtlWrapper
+ * @since 1.0
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java
index 31835191..c4e4f569 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java
@@ -41,8 +41,16 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
+ /**
+ * Statistics on the number of tasks submitted by the fast consumption thread pool
+ */
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
+ /**
+ * Get submitted task count.
+ *
+ * @return submitted task count
+ */
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java
index 3d16c439..9eeedcf9 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java
@@ -17,6 +17,8 @@
package cn.hippo4j.core.executor.support;
+import lombok.Setter;
+
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -28,16 +30,13 @@ public class TaskQueue extends LinkedBlockingQueue
private static final long serialVersionUID = -2635853580887179627L;
+ @Setter
private FastThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
- public void setExecutor(FastThreadPoolExecutor exec) {
- executor = exec;
- }
-
@Override
public boolean offer(Runnable runnable) {
int currentPoolThreadSize = executor.getPoolSize();
@@ -54,10 +53,21 @@ public class TaskQueue extends LinkedBlockingQueue
return super.offer(runnable);
}
- public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
+ /**
+ * Retry offer.
+ *
+ * @param runnable submit thread pool task
+ * @param timeout how long to wait before giving up, in units of
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return
+ * @throws InterruptedException
+ */
+ public boolean retryOffer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Actuator closed!");
}
- return super.offer(o, timeout, unit);
+ return super.offer(runnable, timeout, unit);
}
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java
index 7680b9d4..b1a830f9 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java
@@ -17,6 +17,11 @@
package cn.hippo4j.core.executor.support;
+import cn.hippo4j.common.design.builder.Builder;
+import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
+import cn.hippo4j.common.toolkit.Assert;
+import org.springframework.core.task.TaskDecorator;
+
import java.math.BigDecimal;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
@@ -25,12 +30,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import cn.hippo4j.common.design.builder.Builder;
-import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
-import cn.hippo4j.common.toolkit.Assert;
-
-import org.springframework.core.task.TaskDecorator;
-
/**
* Thread-pool builder.
*/
@@ -74,42 +73,89 @@ public class ThreadPoolBuilder implements Builder {
private Boolean allowCoreThreadTimeOut = false;
+ /**
+ * Calculate core num.
+ *
+ * @return core num
+ */
private Integer calculateCoreNum() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue();
}
+ /**
+ * Is fast pool.
+ *
+ * @param isFastPool is fast pool
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder isFastPool(Boolean isFastPool) {
this.isFastPool = isFastPool;
return this;
}
+ /**
+ * Dynamic pool.
+ *
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder dynamicPool() {
this.isDynamicPool = true;
return this;
}
+ /**
+ * Thread factory.
+ *
+ * @param threadNamePrefix thread name prefix
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder threadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
+ /**
+ * Thread factory.
+ *
+ * @param threadFactory thread factory
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
+ /**
+ * Thread factory.
+ *
+ * @param threadNamePrefix thread name prefix
+ * @param isDaemon is daemon
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) {
this.threadNamePrefix = threadNamePrefix;
this.isDaemon = isDaemon;
return this;
}
+ /**
+ * Core pool size.
+ *
+ * @param corePoolSize core pool size
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
+ /**
+ * Max pool num.
+ *
+ * @param maxPoolSize max pool num
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder maxPoolNum(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
if (maxPoolSize < this.corePoolSize) {
@@ -118,6 +164,11 @@ public class ThreadPoolBuilder implements Builder {
return this;
}
+ /**
+ * Single pool.
+ *
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder singlePool() {
int singleNum = 1;
this.corePoolSize = singleNum;
@@ -125,6 +176,12 @@ public class ThreadPoolBuilder implements Builder {
return this;
}
+ /**
+ * Single pool.
+ *
+ * @param threadNamePrefix thread name prefix
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder singlePool(String threadNamePrefix) {
int singleNum = 1;
this.corePoolSize = singleNum;
@@ -133,128 +190,245 @@ public class ThreadPoolBuilder implements Builder {
return this;
}
+ /**
+ * Pool thread size.
+ *
+ * @param corePoolSize core pool size
+ * @param maxPoolSize max pool size
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) {
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
return this;
}
+ /**
+ * Keep alive time.
+ *
+ * @param keepAliveTime keep alive time
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder keepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
return this;
}
+ /**
+ * Time unit.
+ *
+ * @param timeUnit time unit
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
+ /**
+ * Execute time-out.
+ *
+ * @param executeTimeOut execute time-out
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder executeTimeOut(long executeTimeOut) {
this.executeTimeOut = executeTimeOut;
return this;
}
+ /**
+ * Keep alive time.
+ *
+ * @param keepAliveTime keep alive time
+ * @param timeUnit time unit
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
return this;
}
+ /**
+ * Capacity.
+ *
+ * @param capacity capacity
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder capacity(int capacity) {
this.capacity = capacity;
return this;
}
+ /**
+ * Work queue.
+ *
+ * @param queueType queue type
+ * @param capacity capacity
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum queueType, int capacity) {
this.blockingQueueType = queueType;
this.capacity = capacity;
return this;
}
+ /**
+ * Rejected.
+ *
+ * @param rejectedExecutionHandler rejected execution handler
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder rejected(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
+ /**
+ * Work queue.
+ *
+ * @param blockingQueueType blocking queue type
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum blockingQueueType) {
this.blockingQueueType = blockingQueueType;
return this;
}
+ /**
+ * Work queue.
+ *
+ * @param workQueue work queue
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder workQueue(BlockingQueue workQueue) {
this.workQueue = workQueue;
return this;
}
+ /**
+ * Thread-pool id.
+ *
+ * @param threadPoolId thread-pool id
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder threadPoolId(String threadPoolId) {
this.threadPoolId = threadPoolId;
return this;
}
+ /**
+ * Task decorator.
+ *
+ * @param taskDecorator task decorator
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder taskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
return this;
}
+ /**
+ * Await termination millis.
+ *
+ * @param awaitTerminationMillis await termination millis
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
return this;
}
+ /**
+ * Wait for tasks to complete on shutdown.
+ *
+ * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder waitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this;
}
+ /**
+ * Dynamic support.
+ *
+ * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
+ * @param awaitTerminationMillis await termination millis
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this;
}
+ /**
+ * Allow core thread time-out.
+ *
+ * @param allowCoreThreadTimeOut core thread time-out
+ * @return thread-pool builder
+ */
public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
return this;
}
- @Override
- public ThreadPoolExecutor build() {
- if (isDynamicPool) {
- return buildDynamicPool(this);
- }
- return isFastPool ? buildFastPool(this) : buildPool(this);
- }
-
+ /**
+ * Builder design pattern implementation.
+ *
+ * @return thread-pool builder
+ */
public static ThreadPoolBuilder builder() {
return new ThreadPoolBuilder();
}
/**
- * Create dynamic thread pool by thread pool id
+ * Create dynamic thread pool by thread pool id.
*
- * @param threadPoolId threadPoolId
- * @return ThreadPoolExecutor
+ * @param threadPoolId thread-pool id
+ * @return dynamic thread-pool executor
*/
public static ThreadPoolExecutor buildDynamicPoolById(String threadPoolId) {
- return ThreadPoolBuilder.builder()
- .threadFactory(threadPoolId)
- .threadPoolId(threadPoolId)
- .dynamicPool()
- .build();
+ return ThreadPoolBuilder.builder().threadFactory(threadPoolId).threadPoolId(threadPoolId).dynamicPool().build();
}
+ /**
+ * Build a normal thread-pool with {@code builder}.
+ *
+ * @param builder thread-pool builder
+ * @return normal thread-pool
+ */
private static ThreadPoolExecutor buildPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder));
}
+ /**
+ * Build a fast thread-pool with {@code builder}.
+ *
+ * @param builder thread-pool builder
+ * @return fast thread-pool executor
+ */
private static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder));
}
+ /**
+ * Build a dynamic thread-pool with {@code builder}.
+ *
+ * @param builder thread-pool builder
+ * @return dynamic thread-pool executor
+ */
private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder));
}
+ /**
+ * Build thread-pool initialization parameters via {@code builder}.
+ *
+ * @param builder thread-pool builder
+ * @return thread-pool init param
+ */
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam;
if (builder.threadFactory == null) {
@@ -289,4 +463,12 @@ public class ThreadPoolBuilder implements Builder {
}
return initParam;
}
+
+ @Override
+ public ThreadPoolExecutor build() {
+ if (isDynamicPool) {
+ return buildDynamicPool(this);
+ }
+ return isFastPool ? buildFastPool(this) : buildPool(this);
+ }
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java
index c60ecc4d..0a1d1faf 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java
@@ -36,10 +36,6 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
- private Exception clientTrace() {
- return new Exception("Tread task root stack trace.");
- }
-
@Override
public void execute(final Runnable command) {
super.execute(wrap(command, clientTrace()));
@@ -55,6 +51,22 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
return super.submit(wrap(task, clientTrace()));
}
+ /**
+ * Client trace.
+ *
+ * @return exception
+ */
+ private Exception clientTrace() {
+ return new Exception("Tread task root stack trace.");
+ }
+
+ /**
+ * Wrapping thread pool tasks.
+ *
+ * @param task task
+ * @param clientStack client stack
+ * @return wrapped runnable
+ */
private Runnable wrap(final Runnable task, final Exception clientStack) {
return () -> {
try {
@@ -66,6 +78,14 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
};
}
+ /**
+ * Wrapping thread pool tasks.
+ *
+ * @param task task
+ * @param clientStack client stack
+ * @param computed result
+ * @return wrapped runnable
+ */
private Callable wrap(final Callable task, final Exception clientStack) {
return () -> {
try {