Supplemental code comments (#904)

pull/906/head
马称 Ma Chen 2 years ago committed by GitHub
parent b883dab4f3
commit 733567e498
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,7 +26,6 @@ import java.lang.annotation.Target;
/**
* An annotation that enhances the functionality of the jdk acoustic thread pool,
* with the following list of enhancements.
*
* <ul>
* <li>Dynamic change at runtime</li>
* <li>Determine whether an alarm is required at runtime</li>

@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* wait for tasks to complete on shutdown
* Wait for tasks to complete on shutdown
*/
@Getter
@Setter
@ -54,23 +54,23 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
@ -80,27 +80,26 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(
threadPoolId, new DefaultThreadPoolPluginManager(),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
// init default plugins
// Init default plugins.
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this);
}
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
*/
@Override
public void destroy() {
@ -128,7 +127,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Set support param.
*
* @param awaitTerminationMillis await termination millis
* @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@ -238,5 +237,4 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
public void setRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
}

@ -43,43 +43,42 @@ import java.util.concurrent.*;
public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport {
/**
* thread pool id
* Thread pool id
*/
@Getter
@NonNull
private final String threadPoolId;
/**
* action aware registry
* Action aware registry
*/
@Getter
private final ThreadPoolPluginManager threadPoolPluginManager;
/**
* handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
* Handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
*/
private final RejectedAwareHandlerWrapper handlerWrapper;
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @param threadPoolId thread-pool id
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
@ -89,20 +88,18 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
* or {@code threadFactory} or {@code handler} is null
*/
public ExtensibleThreadPoolExecutor(
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// pool extended info
// Pool extended info.
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
// proxy handler to support Aware callback
// Proxy handler to support Aware callback.
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
@ -115,7 +112,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* <p><b>Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
* @param thread the thread that will run task {@code r}
* @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
@ -145,9 +142,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* <p><b>After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
* @param runnable the runnable that has completed
* @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
* execution completed normally
* execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
@ -191,7 +188,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
}
/**
* {@inheritDoc}.
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@ -291,22 +288,21 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
* thread-pool action aware registry
* Thread-pool action aware registry
*/
private final ThreadPoolPluginManager registry;
/**
* original target
* Original target
*/
@NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task.
*
* @param r the runnable task requested to be executed
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
@ -315,6 +311,5 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
}
}

@ -29,7 +29,6 @@ import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -37,7 +36,14 @@ import org.springframework.boot.CommandLineRunner;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Thread-pool notify alarm handler.
@ -46,7 +52,6 @@ import java.util.concurrent.*;
@RequiredArgsConstructor
public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner {
@NonNull
private final Hippo4jSendMessageService hippo4jSendMessageService;
@Value("${spring.profiles.active:UNKNOWN}")
@ -96,8 +101,8 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Check thread pool capacity alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
* @param threadPoolId thread-pool id
* @param threadPoolExecutor thread-pool executor
*/
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@ -119,8 +124,8 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Check thread pool activity alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
* @param threadPoolId thread-pool id
* @param threadPoolExecutor thread-pool executor
*/
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@ -141,7 +146,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Async send rejected alarm.
*
* @param threadPoolId
* @param threadPoolId thread-pool id
*/
public void asyncSendRejectedAlarm(String threadPoolId) {
Runnable checkPoolRejectedAlarmTask = () -> {
@ -162,10 +167,10 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Async send execute time out alarm.
*
* @param threadPoolId
* @param executeTime
* @param executeTimeOut
* @param threadPoolExecutor
* @param threadPoolId thread-pool id
* @param executeTime execute time
* @param executeTimeOut execute time-out
* @param threadPoolExecutor thread-pool executor
*/
public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@ -193,7 +198,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Send pool config change.
*
* @param request
* @param request change parameter notify request
*/
public void sendPoolConfigChange(ChangeParameterNotifyRequest request) {
request.setActive(active.toUpperCase());
@ -206,7 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Build alarm notify request.
*
* @param threadPoolExecutor
* @param threadPoolExecutor thread-pool executor
* @return
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {

@ -47,6 +47,9 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
printBanner();
}
/**
* Print banner.
*/
private void printBanner() {
String banner = " __ __ ___ ___ __ \n" +
" | |--.|__|.-----..-----..-----.| | | |__|\n" +
@ -67,6 +70,11 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
}
}
/**
* Get version.
*
* @return hippo4j version
*/
public static String getVersion() {
final Package pkg = DynamicThreadPoolBannerHandler.class.getPackage();
return pkg != null ? pkg.getImplementationVersion() : "";

@ -27,23 +27,20 @@ public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task execution.
*
* @param thread thread of executing task
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
default void beforeExecute(Thread thread, Runnable runnable) {
// do noting
}
/**
* Callback after task execution.
*
* @param runnable runnable
* @param runnable runnable
* @param throwable exception thrown during execution
* @see ExtensibleThreadPoolExecutor#afterExecute
*/
default void afterExecute(Runnable runnable, Throwable throwable) {
// do nothing
}
}

@ -31,19 +31,19 @@ import java.util.List;
public class PluginRuntime {
/**
* plugin id
* Plugin id
*/
private final String pluginId;
/**
* runtime info
* Runtime info
*/
private final List<Info> infoList = new ArrayList<>();
/**
* Add a runtime info item.
*
* @param name name
* @param name name
* @param value value
* @return runtime info item
*/
@ -52,12 +52,21 @@ public class PluginRuntime {
return this;
}
/**
* Plugin runtime info.
*/
@Getter
@RequiredArgsConstructor
public static class Info {
/**
* Name
*/
private final String name;
/**
* Value
*/
private final Object value;
}
}

@ -31,7 +31,5 @@ public interface RejectedAwarePlugin extends ThreadPoolPlugin {
* @param executor executor
*/
default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
// do nothing
}
}

@ -35,19 +35,17 @@ public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
* @see ThreadPoolExecutor#shutdownNow()
*/
default void beforeShutdown(ThreadPoolExecutor executor) {
// do nothing
}
/**
* Callback after pool shutdown.
*
* @param executor executor
* @param executor executor
* @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
// do nothing
}
/**
@ -57,7 +55,5 @@ public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
* @see ThreadPoolExecutor#terminated()
*/
default void afterTerminated(ExtensibleThreadPoolExecutor executor) {
// do nothing
}
}

@ -43,7 +43,7 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param future original task
* @param future original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Callable)
*/
@ -61,5 +61,4 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
default Runnable beforeTaskExecute(Runnable runnable) {
return runnable;
}
}

@ -57,7 +57,6 @@ public interface ThreadPoolPlugin {
* @see ThreadPoolPluginManager#register
*/
default void start() {
// do nothing
}
/**
@ -67,7 +66,6 @@ public interface ThreadPoolPlugin {
* @see ThreadPoolPluginManager#clear
*/
default void stop() {
// do nothing
}
/**
@ -78,5 +76,4 @@ public interface ThreadPoolPlugin {
default PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId());
}
}

@ -36,7 +36,7 @@ import java.util.Optional;
public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
/**
* start times of executed tasks
* Start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
@ -55,7 +55,7 @@ public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
@ -84,5 +84,4 @@ public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
* @param taskExecuteTime execute time of task
*/
protected abstract void processTaskTime(long taskExecuteTime);
}

@ -45,7 +45,7 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
}
/**
* decorators
* Decorators
*/
@Getter
private final List<TaskDecorator> decorators = new ArrayList<>();
@ -77,7 +77,7 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
}
/**
* Add a decorator
* Add a decorator.
*
* @param decorator decorator
*/
@ -87,19 +87,16 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
}
/**
* Clear all decorators
*
* Clear all decorators.
*/
public void clearDecorators() {
decorators.clear();
}
/**
* Remove decorators
*
* Remove decorators.
*/
public void removeDecorator(TaskDecorator decorator) {
decorators.remove(decorator);
}
}

@ -43,7 +43,7 @@ public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
}
/**
* rejection count
* Rejection count
*/
@Setter
@Getter
@ -72,12 +72,11 @@ public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
}
/**
* Get reject count num
* Get reject count num.
*
* @return reject count num
*/
public Long getRejectCountNum() {
return rejectCount.get();
}
}

@ -34,27 +34,27 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
* Lock instance.
* Lock instance
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks.
* Total execution milli time of all tasks
*/
private long totalTaskTimeMillis = 0L;
/**
* Maximum task milli execution time, default -1.
* Maximum task milli execution time, default -1
*/
private long maxTaskTimeMillis = -1L;
/**
* Minimal task milli execution time, default -1.
* Minimal task milli execution time, default -1
*/
private long minTaskTimeMillis = -1L;
/**
* Count of completed task.
* Count of completed task
*/
private long taskCount = 0L;
@ -137,27 +137,27 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static class Summary {
/**
* Total execution nano time of all tasks.
* Total execution nano time of all tasks
*/
private final long totalTaskTimeMillis;
/**
* Maximum task nano execution time.
* Maximum task nano execution time
*/
private final long maxTaskTimeMillis;
/**
* Minimal task nano execution time.
* Minimal task nano execution time
*/
private final long minTaskTimeMillis;
/**
* Count of completed task.
* Count of completed task
*/
private final long taskCount;
/**
* Get the avg task time in milliseconds.
* Get the avg task time in milliseconds
*
* @return avg task time
*/
@ -165,7 +165,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
long totalTaskCount = getTaskCount();
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
}
}
}

@ -35,16 +35,19 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
/**
* threadPoolId
* Thread-pool id
*/
private final String threadPoolId;
/**
* Execute time-out
*/
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
* Thread-pool executor
*/
private final ThreadPoolExecutor threadPoolExecutor;
@ -74,5 +77,4 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor));
}
}

@ -54,7 +54,7 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
}
/**
* await termination millis
* Await termination millis
*/
@Setter
public long awaitTerminationMillis;
@ -80,7 +80,7 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
* cancel the remaining tasks,
* then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
*
* @param executor executor
* @param executor executor
* @param remainingTasks remainingTasks
*/
@Override
@ -141,5 +141,4 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
Thread.currentThread().interrupt();
}
}
}

@ -52,32 +52,32 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* lock of this instance
* Lock of this instance
*/
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock();
/**
* Registered {@link ThreadPoolPlugin}.
* Registered {@link ThreadPoolPlugin}
*/
private final Map<String, ThreadPoolPlugin> registeredPlugins = new ConcurrentHashMap<>(16);
/**
* Registered {@link TaskAwarePlugin}.
* Registered {@link TaskAwarePlugin}
*/
private final List<TaskAwarePlugin> taskAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link ExecuteAwarePlugin}.
* Registered {@link ExecuteAwarePlugin}
*/
private final List<ExecuteAwarePlugin> executeAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link RejectedAwarePlugin}.
* Registered {@link RejectedAwarePlugin}
*/
private final List<RejectedAwarePlugin> rejectedAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link ShutdownAwarePlugin}.
* Registered {@link ShutdownAwarePlugin}
*/
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new CopyOnWriteArrayList<>();
@ -115,10 +115,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
try {
String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered");
// register plugin
registeredPlugins.put(id, plugin);
// quick index
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin) plugin);
}
@ -171,7 +168,6 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
Optional.ofNullable(pluginId)
.map(registeredPlugins::remove)
.ifPresent(plugin -> {
// remove quick index if necessary
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.remove(plugin);
}
@ -227,10 +223,10 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
}
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
* @param <A> plugin type
* @param <A> plugin type
* @return {@link ThreadPoolPlugin}, null if unregister
*/
@Override
@ -314,5 +310,4 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
readLock.unlock();
}
}
}

@ -38,12 +38,12 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* execute time out
* Execute time out
*/
private long executeTimeOut;
/**
* await termination millis
* Await termination millis
*/
private long awaitTerminationMillis;
@ -64,14 +64,10 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
*/
@Override
public void doRegister(ThreadPoolPluginSupport support) {
// callback when task execute
support.register(new TaskDecoratorPlugin());
support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor()));
// callback when task rejected
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
// callback when pool shutdown
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
}
}

@ -32,7 +32,7 @@ import java.util.Optional;
public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* default instance
* Default instance
*/
public static final EmptyThreadPoolPluginManager INSTANCE = new EmptyThreadPoolPluginManager();
@ -41,7 +41,6 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public void clear() {
// do nothing
}
/**
@ -64,7 +63,6 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public void register(ThreadPoolPlugin plugin) {
// do nothing
}
/**
@ -90,17 +88,16 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
}
/**
* Unregister {@link ThreadPoolPlugin}
* Unregister {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
// do nothing
}
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin}
@ -150,5 +147,4 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return Collections.emptyList();
}
}

@ -54,7 +54,7 @@ public interface ThreadPoolPluginManager {
Collection<ThreadPoolPlugin> getAllPlugins();
/**
* Register a {@link ThreadPoolPlugin}
* Register a {@link ThreadPoolPlugin}.
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
@ -80,17 +80,17 @@ public interface ThreadPoolPluginManager {
boolean isRegistered(String pluginId);
/**
* Unregister {@link ThreadPoolPlugin}
* Unregister {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
*/
void unregister(String pluginId);
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
* @param <A> target aware type
* @param pluginId plugin id
* @param <A> target aware type
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@ -129,7 +129,7 @@ public interface ThreadPoolPluginManager {
/**
* Get plugin of type.
*
* @param pluginId plugin id
* @param pluginId plugin id
* @param pluginType plugin type
* @return target plugin
*/
@ -172,5 +172,4 @@ public interface ThreadPoolPluginManager {
return getPlugin(pluginId)
.map(ThreadPoolPlugin::getPluginRuntime);
}
}

@ -38,5 +38,4 @@ public interface ThreadPoolPluginRegistrar {
* @param support thread pool plugin manager delegate
*/
void doRegister(ThreadPoolPluginSupport support);
}

@ -94,7 +94,7 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
}
/**
* Unregister {@link ThreadPoolPlugin}
* Unregister {@link ThreadPoolPlugin}.
*
* @param pluginId name
*/
@ -114,7 +114,7 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
}
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId target name
* @return {@link ThreadPoolPlugin}, null if unregister
@ -164,5 +164,4 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
default Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return getThreadPoolPluginManager().getTaskAwarePluginList();
}
}

@ -34,10 +34,19 @@ import java.util.concurrent.atomic.AtomicLong;
@AllArgsConstructor
public class RejectedProxyInvocationHandler implements InvocationHandler {
/**
* Target object
*/
private final Object target;
/**
* Thread-pool id
*/
private final String threadPoolId;
/**
* Reject count
*/
private final AtomicLong rejectCount;
@Override

@ -33,9 +33,9 @@ public class RejectedProxyUtil {
/**
* Proxy rejected execution.
*
* @param rejectedExecutionHandler
* @param threadPoolId
* @param rejectedNum
* @param rejectedExecutionHandler rejected execution handler
* @param threadPoolId thread-pool id
* @param rejectedNum rejected num
* @return
*/
public static RejectedExecutionHandler createProxy(RejectedExecutionHandler rejectedExecutionHandler, String threadPoolId, AtomicLong rejectedNum) {

@ -34,7 +34,7 @@ public class ExecutorTraceContextUtil {
/**
* Get and remove.
*
* @return
* @return timeout trace
*/
public static String getAndRemoveTimeoutTrace() {
String val = MDC.get(EXECUTE_TIMEOUT_TRACE_KEY);
@ -45,7 +45,7 @@ public class ExecutorTraceContextUtil {
/**
* Put execute timeout trace.
*
* @param trace
* @param trace trace
*/
public static void putExecuteTimeoutTrace(String trace) {
MDC.put(EXECUTE_TIMEOUT_TRACE, trace);
@ -54,7 +54,7 @@ public class ExecutorTraceContextUtil {
/**
* Set execute timeout trace key.
*
* @param key
* @param key trace key
*/
public static void setExecuteTimeoutTraceKey(String key) {
EXECUTE_TIMEOUT_TRACE_KEY = key;

@ -38,14 +38,20 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
*/
public class IdentifyUtil {
private static String IDENTIFY;
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
static {
DynamicThreadPoolServiceLoader.register(ClientNetworkService.class);
}
/**
* Identify
*/
private static String IDENTIFY;
/**
* Client identification value
*/
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
/**
* Generate identify.
*

@ -27,14 +27,29 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class SystemClock {
/**
* Period
*/
private final int period;
/**
* Now
*/
private final AtomicLong now;
/**
* Thread name
*/
private static final String THREAD_NAME = "system.clock";
/**
* Instance holder.
*/
private static class InstanceHolder {
/**
* System clock instance
*/
private static final SystemClock INSTANCE = new SystemClock(1);
}
@ -44,10 +59,18 @@ public class SystemClock {
scheduleClockUpdating();
}
/**
* Instance.
*
* @return System clock instance
*/
private static SystemClock instance() {
return InstanceHolder.INSTANCE;
}
/**
* Schedule clock updating.
*/
private void scheduleClockUpdating() {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, runnable -> {
Thread thread = new Thread(runnable, THREAD_NAME);
@ -57,10 +80,20 @@ public class SystemClock {
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}
/**
* Current time millis.
*
* @return current time millis
*/
private long currentTimeMillis() {
return now.get();
}
/**
* Now.
*
* @return current time millis
*/
public static long now() {
return instance().currentTimeMillis();
}

@ -75,7 +75,7 @@ public class InetUtils implements Closeable {
try {
int lowest = Integer.MAX_VALUE;
for (Enumeration<NetworkInterface> nics = NetworkInterface
.getNetworkInterfaces(); nics.hasMoreElements();) {
.getNetworkInterfaces(); nics.hasMoreElements(); ) {
NetworkInterface ifc = nics.nextElement();
if (ifc.isUp()) {
this.log.trace("Testing interface: " + ifc.getDisplayName());
@ -84,7 +84,6 @@ public class InetUtils implements Closeable {
} else {
continue;
}
// @formatter:off
if (!ignoreInterface(ifc.getDisplayName())) {
for (Enumeration<InetAddress> addrs = ifc
.getInetAddresses(); addrs.hasMoreElements(); ) {
@ -98,7 +97,6 @@ public class InetUtils implements Closeable {
}
}
}
// @formatter:on
}
}
} catch (IOException ex) {
@ -150,7 +148,6 @@ public class InetUtils implements Closeable {
public HostInfo convertAddress(final InetAddress address) {
HostInfo hostInfo = new HostInfo();
Future<String> result = this.executorService.submit(address::getHostName);
String hostname;
try {
hostname = result.get(this.properties.getTimeoutSeconds(), TimeUnit.SECONDS);

Loading…
Cancel
Save