feat: support the extension of functions through plugins

pull/842/head
huangchengxing 3 years ago
parent 03ea6668af
commit f5996a8a34

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.toolkit;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Thread safe time recorder,
* used to record multiple time periods and count various time indicators.
*
* @author huangchengxing
*/
public class SyncTimeRecorder {
/**
* Lock instance.
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks.
*/
private long totalTaskTime = 0L;
/**
* Maximum task milli execution time, default -1.
*/
private long maxTaskTime = -1L;
/**
* Minimal task milli execution time, default -1.
*/
private long minTaskTime = -1L;
/**
* Count of completed task.
*/
private long taskCount = 0L;
/**
* Get the summary statistics of the instance at the current time.
*
* @return data snapshot
*/
public Summary summarize() {
Lock readLock = lock.readLock();
Summary statistics;
readLock.lock();
try {
statistics = new Summary(
this.totalTaskTime,
this.maxTaskTime,
this.minTaskTime,
this.taskCount);
} finally {
readLock.unlock();
}
return statistics;
}
/**
* Refresh time indicators of the current instance.
*
* @param taskExecutionTime millisecond
*/
public final void refreshTime(long taskExecutionTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount == 0) {
maxTaskTime = taskExecutionTime;
minTaskTime = taskExecutionTime;
} else {
maxTaskTime = Math.max(taskExecutionTime, maxTaskTime);
minTaskTime = Math.min(taskExecutionTime, minTaskTime);
}
taskCount = taskCount + 1;
totalTaskTime += taskExecutionTime;
} finally {
writeLock.unlock();
}
}
/**
* Summary statistics of SyncTimeRecorder instance at a certain time.
*/
@Getter
@RequiredArgsConstructor
public static class Summary {
/**
* Total execution nano time of all tasks.
*/
private final long totalTaskTime;
/**
* Maximum task nano execution time.
*/
private final long maxTaskTime;
/**
* Minimal task nano execution time.
*/
private final long minTaskTime;
/**
* Count of completed task.
*/
private final long taskCount;
/**
* Get the avg task time in milliseconds.
*
* @return avg task time
*/
public long getAvgTaskTimeMillis() {
return getTotalTaskTime() / getTaskCount();
}
}
}

@ -10,6 +10,11 @@
<artifactId>hippo4j-core</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

@ -17,107 +17,180 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistrar;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* Enhanced dynamic and monitored thread pool.
*/
public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@Getter
@Setter
private Long executeTimeOut;
@Getter
@Setter
private TaskDecorator taskDecorator;
@Getter
@Setter
private RejectedExecutionHandler redundancyHandler;
@Getter
private final String threadPoolId;
@Getter
private final AtomicLong rejectCount = new AtomicLong();
private final ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>();
public DynamicThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
long executeTimeOut,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler);
this.threadPoolId = threadPoolId;
this.executeTimeOut = executeTimeOut;
// Number of dynamic proxy denial policies.
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
setRejectedExecutionHandler(rejectedProxy);
// Redundant fields to avoid reflecting the acquired fields when sending change information.
redundancyHandler = rejectedExecutionHandler;
@Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(
threadPoolId, new DefaultThreadPoolPluginRegistry(),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler
);
log.info("Initializing ExecutorService" + threadPoolId);
// init default aware processor
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown)
.doRegister(this, this);
}
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
* @throws Exception in case of shutdown errors. Exceptions will get logged
* but not rethrown to allow other beans to release their resources as well.
*/
@Override
public void execute(@NonNull Runnable command) {
if (taskDecorator != null) {
command = taskDecorator.decorate(command);
}
super.execute(command);
public void destroy() throws Exception {
getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
processor -> {
if (processor.isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
});
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (executeTimeOut == null || executeTimeOut <= 0) {
return;
}
startTimeThreadLocal.set(SystemClock.now());
public long getAwaitTerminationMillis() {
return getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis, -1L
);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Long startTime;
if ((startTime = startTimeThreadLocal.get()) == null) {
return;
}
try {
long endTime = SystemClock.now();
long executeTime;
boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
if (notifyAlarmHandler != null) {
notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
public boolean isWaitForTasksToCompleteOnShutdown() {
return getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown, false
);
}
/**
* Set support param.
*
* @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
*/
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis)
.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown)
);
}
public Long getRejectCountNum() {
return getAndThen(
TaskRejectCountRecordPlugin.PLUGIN_NAME,
TaskRejectCountRecordPlugin.class,
TaskRejectCountRecordPlugin::getRejectCountNum, -1L
);
}
public Long getExecuteTimeOut() {
return getAndThen(
TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME,
TaskTimeoutNotifyAlarmPlugin.class,
TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut, -1L
);
}
public void setExecuteTimeOut(Long executeTimeOut) {
getAndThen(
TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME,
TaskTimeoutNotifyAlarmPlugin.class,
processor -> processor.setExecuteTimeOut(executeTimeOut)
);
}
public TaskDecorator getTaskDecorator() {
return getAndThen(
TaskDecoratorPlugin.PLUGIN_NAME,
TaskDecoratorPlugin.class,
processor -> CollectionUtil.getFirst(processor.getDecorators()), null
);
}
public void setTaskDecorator(TaskDecorator taskDecorator) {
if (Objects.nonNull(taskDecorator)) {
getAndThen(
TaskDecoratorPlugin.PLUGIN_NAME,
TaskDecoratorPlugin.class,
processor -> {
processor.clearDecorators();
processor.addDecorator(taskDecorator);
}
}
} finally {
startTimeThreadLocal.remove();
);
}
}
@Override
protected ExecutorService initializeExecutor() {
return this;
public RejectedExecutionHandler getRedundancyHandler() {
return getRejectedExecutionHandler();
}
public Long getRejectCountNum() {
return rejectCount.get();
public void getRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
}

@ -17,7 +17,6 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -67,8 +66,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override
public void destroy() throws Exception {
if (executor != null && executor instanceof AbstractDynamicExecutorSupport) {
((AbstractDynamicExecutorSupport) executor).destroy();
if (executor != null && executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).destroy();
}
}
}

@ -0,0 +1,309 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.core.plugin.*;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
/**
* <p>Extensible thread-pool executor. <br />
* Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}.
* Each extension point corresponds to a different {@link ThreadPoolPlugin} interface,
* users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface
* to enable plugins to sense thread pool behavior and provide extended functions.
*
* @author huangchengxing
*/
public class ExtensibleThreadPoolExecutor
extends ThreadPoolExecutor implements ThreadPoolPluginRegistryDelegate {
/**
* thread pool id
*/
@Getter
@NonNull
private final String threadPoolId;
/**
* action aware registry
*/
@Getter
private final ThreadPoolPluginRegistry threadPoolPluginRegistry;
/**
* handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
*/
private final RejectedAwareHandlerWrapper handlerWrapper;
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolPluginRegistry action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ExtensibleThreadPoolExecutor(
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginRegistry threadPoolPluginRegistry,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// pool extended info
this.threadPoolId = threadPoolId;
this.threadPoolPluginRegistry = threadPoolPluginRegistry;
// proxy handler to support Aware callback
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginRegistry, handler);
super.setRejectedExecutionHandler(handlerWrapper);
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
* @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method, {@link ExecuteAwarePlugin#execute} will be called first.
*
* @param runnable the task to execute
*/
@Override
public void execute(@NonNull Runnable runnable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
for (ExecuteAwarePlugin executeAwarePlugin : executeAwarePluginList) {
runnable = executeAwarePlugin.execute(runnable);
}
super.execute(runnable);
}
/**
* {@inheritDoc}
*
* <p><b>After calling the parent class method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
* @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
* execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method,
* {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
* and then will be call {@link ShutdownAwarePlugin#afterShutdown}
*
* @throws SecurityException {@inheritDoc}
*/
@Override
public void shutdown() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
super.shutdown();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList()));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method,
* {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
* and then will be call {@link ShutdownAwarePlugin#afterShutdown}
*
* @throws SecurityException
*/
@Override
public List<Runnable> shutdownNow() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
List<Runnable> tasks = super.shutdownNow();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks));
return tasks;
}
/**
* {@inheritDoc}.
*
* <p><b>Before calling the parent class method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@Override
protected void terminated() {
super.terminated();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value);
}
return super.newTaskFor(runnable, value);
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
*
* @param callable the callable task being wrapped
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
callable = taskAwarePlugin.beforeTaskCreate(this, callable);
}
return super.newTaskFor(callable);
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
@Override
public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
handlerWrapper.setHandler(handler);
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handlerWrapper.getHandler();
}
/**
* Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor},
* It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}.
*
* @author huangchengxing
* @see RejectedAwarePlugin
*/
@AllArgsConstructor
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
* thread-pool action aware registry
*/
private final ThreadPoolPluginRegistry registry;
/**
* original target
*/
@NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Collection<RejectedAwarePlugin> rejectedAwarePluginList = registry.getRejectedAwareList();
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
}
}

@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L;

@ -21,10 +21,8 @@ import cn.hippo4j.common.model.ManyThreadPoolRunStateInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -70,11 +68,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();
String rejectedName;
if (pool instanceof AbstractDynamicExecutorSupport) {
rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName();
} else {
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
}
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
poolRunStateInfo.setRejectedName(rejectedName);
ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class);
manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE);

@ -17,6 +17,7 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@ -25,7 +26,10 @@ import java.util.concurrent.*;
/**
* Dynamic executor configuration support.
*
* @deprecated used {@link ThreadPoolExecutorShutdownPlugin} to get thread pool shutdown support
*/
@Deprecated
@Slf4j
public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean {

@ -0,0 +1,105 @@
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.impl.*;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.AliasRegistry;
import java.util.Objects;
/**
* Register default {@link ThreadPoolPlugin}.
*
* @author huangchengxing
* @see TaskDecoratorPlugin
* @see TaskTimeoutNotifyAlarmPlugin
* @see TaskRejectCountRecordPlugin
* @see TaskRejectNotifyAlarmPlugin
* @see ThreadPoolExecutorShutdownPlugin
*/
@RequiredArgsConstructor
public class DefaultThreadPoolPluginRegistrar
implements ThreadPoolPluginRegistrar, ApplicationContextAware, BeanNameAware {
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* alias registry
*/
private AliasRegistry aliasRegistry;
/**
* execute time out
*/
private final long executeTimeOut;
/**
* await termination millis
*/
private final long awaitTerminationMillis;
/**
* wait for tasks to complete on shutdown
*/
private final boolean waitForTasksToCompleteOnShutdown;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return REGISTRAR_NAME;
}
/**
* Create and register plugin for the specified thread-pool instance
*
* @param registry thread pool plugin registry
* @param executor executor
*/
@Override
public void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor) {
// callback when task execute
registry.register(new TaskDecoratorPlugin());
registry.register(new TaskTimeoutNotifyAlarmPlugin(executeTimeOut, executor));
// callback when task rejected
registry.register(new TaskRejectCountRecordPlugin());
registry.register(new TaskRejectNotifyAlarmPlugin());
// callback when pool shutdown
registry.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis, waitForTasksToCompleteOnShutdown));
}
/**
* Set the name of the bean in the bean factory that created this bean.
* <p>Invoked after population of normal bean properties but before an
* init callback such as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method.
*
* @param name the name of the bean in the factory.
* Note that this name is the actual bean name used in the factory, which may
* differ from the originally specified name: in particular for inner bean
* names, the actual bean name might have been made unique through appending
* "#..." suffixes. Use the {@link BeanFactoryUtils#originalBeanName(String)}
* method to extract the original bean name (without suffix), if desired.
*/
@Override
public void setBeanName(String name) {
if (Objects.nonNull(aliasRegistry)) {
aliasRegistry.registerAlias(name, getId());
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.aliasRegistry = applicationContext.getBean(AliasRegistry.class);
}
}

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.common.toolkit.Assert;
import lombok.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The default implementation of {@link ThreadPoolPluginRegistry}.
*
* @author huangchengxing
*/
public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry {
/**
* lock of this instance
*/
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock();
/**
* Registered {@link ThreadPoolPlugin}.
*/
private final Map<String, ThreadPoolPlugin> registeredPlugins = new HashMap<>(16);
/**
* Registered {@link TaskAwarePlugin}.
*/
private final List<TaskAwarePlugin> taskAwarePluginList = new ArrayList<>();
/**
* Registered {@link ExecuteAwarePlugin}.
*/
private final List<ExecuteAwarePlugin> executeAwarePluginList = new ArrayList<>();
/**
* Registered {@link RejectedAwarePlugin}.
*/
private final List<RejectedAwarePlugin> rejectedAwarePluginList = new ArrayList<>();
/**
* Registered {@link ShutdownAwarePlugin}.
*/
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new ArrayList<>();
/**
* Clear all.
*/
@Override
public synchronized void clear() {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
registeredPlugins.clear();
taskAwarePluginList.clear();
executeAwarePluginList.clear();
rejectedAwarePluginList.clear();
shutdownAwarePluginList.clear();
} finally {
writeLock.unlock();
}
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
@Override
public void register(@NonNull ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plug-in with id [" + id + "] has been registered");
// register plugin
registeredPlugins.put(id, plugin);
// quick index
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin)plugin);
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.add((ExecuteAwarePlugin)plugin);
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.add((RejectedAwarePlugin)plugin);
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.add((ShutdownAwarePlugin)plugin);
}
} finally {
writeLock.unlock();
}
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
Optional.ofNullable(pluginId)
.map(registeredPlugins::remove)
.ifPresent(old -> {
// remove quick index if necessary
if (old instanceof TaskAwarePlugin) {
taskAwarePluginList.remove(old);
}
if (old instanceof ExecuteAwarePlugin) {
executeAwarePluginList.remove(old);
}
if (old instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.remove(old);
}
if (old instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.remove(old);
}
});
} finally {
writeLock.unlock();
}
}
@Override
public Collection<ThreadPoolPlugin> getAllPlugins() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return registeredPlugins.values();
} finally {
readLock.unlock();
}
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
@Override
public boolean isRegistered(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return registeredPlugins.containsKey(pluginId);
} finally {
readLock.unlock();
}
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @param <A> plugin type
* @return {@link ThreadPoolPlugin}, null if unregister
*/
@Nullable
@Override
@SuppressWarnings("unchecked")
public <A extends ThreadPoolPlugin> A getPlugin(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return (A) registeredPlugins.get(pluginId);
} finally {
readLock.unlock();
}
}
/**
* Get execute plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
public Collection<ExecuteAwarePlugin> getExecuteAwareList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return executeAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get rejected plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
public Collection<RejectedAwarePlugin> getRejectedAwareList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return rejectedAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get shutdown plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<ShutdownAwarePlugin> getShutdownAwareList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return shutdownAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get shutdown plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<TaskAwarePlugin> getTaskAwareList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return taskAwarePluginList;
} finally {
readLock.unlock();
}
}
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
/**
* Callback during task execution.
*
* @author huangchengxing
*/
public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task execution.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
default void beforeExecute(Thread thread, Runnable runnable) {
}
/**
* Callback when task is executed.
*
* @param runnable runnable
* @return tasks to be execute
* @see ExtensibleThreadPoolExecutor#execute
*/
default Runnable execute(Runnable runnable) {
return runnable;
}
/**
* Callback after task execution.
*
* @param runnable runnable
* @param throwable exception thrown during execution
* @see ExtensibleThreadPoolExecutor#afterExecute
*/
default void afterExecute(Runnable runnable, Throwable throwable) {
// do nothing
}
}

@ -0,0 +1,47 @@
package cn.hippo4j.core.plugin;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* Plug in runtime information.
*
* @author huangchengxing
*/
@RequiredArgsConstructor
@Getter
public class PluginRuntime {
/**
* plugin id
*/
private final String pluginId;
/**
* runtime info items
*/
private final List<Item> items = new ArrayList<>();
/**
* Add a runtime info item.
*
* @param name name
* @param value value
* @return runtime info item
*/
public PluginRuntime addItem(String name, Object value) {
items.add(new Item(name, value));
return this;
}
@Getter
@RequiredArgsConstructor
public static class Item {
private final String name;
private final Object value;
}
}

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback when task is rejected.
*
* @author huangchengxing
*/
public interface RejectedAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
// do nothing
}
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback before thread-pool shutdown.
*
* @author huangchengxing
*/
public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before pool shutdown.
*
* @param executor executor
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void beforeShutdown(ThreadPoolExecutor executor) {
// do nothing
}
/**
* Callback after pool shutdown.
*
* @param executor executor
* @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
// do nothing
}
/**
* Callback after pool terminated.
*
* @param executor executor
* @see ThreadPoolExecutor#terminated()
*/
default void afterTerminated(ExtensibleThreadPoolExecutor executor) {
// do nothing
}
}

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback during task submit in thread-pool.
*
* @author huangchengxing
*/
public interface TaskAwarePlugin extends ThreadPoolPlugin {
/**
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param runnable original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Runnable, Object)
*/
default <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
return runnable;
}
/**
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param future original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Callable)
*/
default <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
return future;
}
}

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
/**
* Thread pool action aware.
*
* @author huangchengxing
* @see ExtensibleThreadPoolExecutor
* @see ThreadPoolPluginRegistry
* @see TaskAwarePlugin
* @see ExecuteAwarePlugin
* @see ShutdownAwarePlugin
* @see RejectedAwarePlugin
*/
public interface ThreadPoolPlugin {
/**
* Get id.
*
* @return id
*/
String getId();
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
default PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId());
}
}

@ -0,0 +1,28 @@
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
/**
* Factory of {@link ThreadPoolPlugin}.
*
* @author huangchengxing
*/
public interface ThreadPoolPluginRegistrar {
/**
* Get id.
* In spring container, the obtained id will be used as the alias of the bean name.
*
* @return id
*/
String getId();
/**
* Create and register plugin for the specified thread-pool instance
*
* @param registry thread pool plugin registry
* @param executor executor
*/
void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor);
}

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Registry of {@link ThreadPoolPlugin}
*
* @author huangchengxing
*/
public interface ThreadPoolPluginRegistry {
/**
* Clear all.
*/
void clear();
/**
* Get all registered plugins.
*
* @return plugins
*/
Collection<ThreadPoolPlugin> getAllPlugins();
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
* already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
void register(ThreadPoolPlugin plugin);
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
boolean isRegistered(String pluginId);
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
void unregister(String pluginId);
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @param <A> target aware type
* @return {@link ThreadPoolPlugin}, null if unregister
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Nullable
<A extends ThreadPoolPlugin> A getPlugin(String pluginId);
/**
* Get execute aware plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
Collection<ExecuteAwarePlugin> getExecuteAwareList();
/**
* Get rejected aware plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
Collection<RejectedAwarePlugin> getRejectedAwareList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<ShutdownAwarePlugin> getShutdownAwareList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<TaskAwarePlugin> getTaskAwareList();
/**
* Try to get target plugin and apply operation, do nothing if it's not present.
*
* @param pluginId plugin id
* @param targetType target type
* @param consumer operation for target plugin
* @param <A> plugin type
* @return this instance
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
default <A extends ThreadPoolPlugin> ThreadPoolPluginRegistry getAndThen(
String pluginId, Class<A> targetType, Consumer<A> consumer) {
Optional.ofNullable(getPlugin(pluginId))
.map(targetType::cast)
.ifPresent(consumer);
return this;
}
/**
* Try to get target plugin and return value of apply function, return default value if it's not present.
*
* @param pluginId plugin id
* @param targetType target type
* @param function operation for target plugin
* @param defaultValue default value
* @param <A> plugin type
* @return value of apply function, default value if plugin is not present
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
default <A extends ThreadPoolPlugin, R> R getAndThen(String pluginId, Class<A> targetType, Function<A, R> function, R defaultValue) {
return Optional.ofNullable(getPlugin(pluginId))
.map(targetType::cast)
.map(function)
.orElse(defaultValue);
}
}

@ -0,0 +1,124 @@
package cn.hippo4j.core.plugin;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
/**
* Thread pool action aware registry delegate.
*
* @author huangchengxing
*/
public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegistry {
/**
* Get thread pool action aware registry.
*
* @return {@link ThreadPoolPluginRegistry}
*/
@NonNull
ThreadPoolPluginRegistry getThreadPoolPluginRegistry();
/**
* Clear all.
*/
@Override
default void clear() {
getThreadPoolPluginRegistry().clear();
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin aware
*/
@Override
default void register(ThreadPoolPlugin plugin) {
getThreadPoolPluginRegistry().register(plugin);
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId name
* @return ture if target has been registered, false otherwise
*/
@Override
default boolean isRegistered(String pluginId) {
return getThreadPoolPluginRegistry().isRegistered(pluginId);
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId name
*/
@Override
default void unregister(String pluginId) {
getThreadPoolPluginRegistry().unregister(pluginId);
}
/**
* Get all registered plugins.
*
* @return plugins
*/
@Override
default Collection<ThreadPoolPlugin> getAllPlugins() {
return getThreadPoolPluginRegistry().getAllPlugins();
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId target name
* @return {@link ThreadPoolPlugin}, null if unregister
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Nullable
@Override
default <A extends ThreadPoolPlugin> A getPlugin(String pluginId) {
return getThreadPoolPluginRegistry().getPlugin(pluginId);
}
/**
* Get execute aware list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
default Collection<ExecuteAwarePlugin> getExecuteAwareList() {
return getThreadPoolPluginRegistry().getExecuteAwareList();
}
/**
* Get rejected aware list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
default Collection<RejectedAwarePlugin> getRejectedAwareList() {
return getThreadPoolPluginRegistry().getRejectedAwareList();
}
/**
* Get shutdown aware list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<ShutdownAwarePlugin> getShutdownAwareList() {
return getThreadPoolPluginRegistry().getShutdownAwareList();
}
/**
* Get shutdown aware list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<TaskAwarePlugin> getTaskAwareList() {
return getThreadPoolPluginRegistry().getTaskAwareList();
}
}

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import lombok.Getter;
import lombok.NonNull;
import org.springframework.core.task.TaskDecorator;
import java.util.ArrayList;
import java.util.List;
/**
* Decorate tasks when they are submitted to thread-pool.
*
* @author huangchengxing
*/
public class TaskDecoratorPlugin implements ExecuteAwarePlugin {
public static final String PLUGIN_NAME = "task-decorator-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* decorators
*/
@Getter
private final List<TaskDecorator> decorators = new ArrayList<>();
/**
* Callback when task is executed.
*
* @param runnable runnable
* @return tasks to be execute
* @see ExtensibleThreadPoolExecutor#execute
*/
@Override
public Runnable execute(Runnable runnable) {
for (TaskDecorator decorator : decorators) {
runnable = decorator.decorate(runnable);
}
return runnable;
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addItem("decorators", decorators);
}
/**
* Add a decorator
*
* @param decorator decorator
*/
public void addDecorator(@NonNull TaskDecorator decorator) {
decorators.remove(decorator);
decorators.add(decorator);
}
/**
* Clear all decorators
*
*/
public void clearDecorators() {
decorators.clear();
}
/**
* Remove decorators
*
*/
public void removeDecorator(TaskDecorator decorator) {
decorators.remove(decorator);
}
}

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/**
* Record the number of tasks rejected by the thread pool.
*
* @author huangchengxing
*/
public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = "task-reject-count-record-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* rejection count
*/
@Setter
@Getter
private AtomicLong rejectCount = new AtomicLong(0);
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addItem("rejectCount", getRejectCountNum());
}
/**
* Record rejection count.
*
* @param r task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectCount.incrementAndGet();
}
/**
* Get reject count num
*
* @return reject count num
*/
public Long getRejectCountNum() {
return rejectCount.get();
}
}

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Send alert notification when a task is rejected.
*
* @author huangchengxing
*/
public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (!(executor instanceof ExtensibleThreadPoolExecutor)) {
return;
}
String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId();
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId));
}
}

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.SyncTimeRecorder;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.RequiredArgsConstructor;
import java.util.Objects;
/**
* Record task execution time indicator.
*
* @author huangchengxing
* @see TaskTimeoutNotifyAlarmPlugin
*/
@RequiredArgsConstructor
public class TaskTimeRecordPlugin extends SyncTimeRecorder implements ExecuteAwarePlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(SystemClock.now());
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
Summary summary = summarize();
return new PluginRuntime(getId())
.addItem("taskCount", summary.getTaskCount())
.addItem("minTaskTime", summary.getMinTaskTime())
.addItem("maxTaskTime", summary.getMaxTaskTime())
.addItem("totalTaskTime", summary.getTotalTaskTime())
.addItem("avgTaskTime", summary.getAvgTaskTimeMillis());
}
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
try {
Long startTime = startTimes.get();
if (Objects.isNull(startTime)) {
return;
}
long executeTime = SystemClock.now() - startTime;
refreshTime(executeTime);
afterRefreshTime(executeTime);
} finally {
startTimes.remove();
}
}
/**
* The callback function provided to the subclass, which is called after {@link #refreshTime}
*
* @param executeTime executeTime
*/
protected void afterRefreshTime(long executeTime) {
// do nothing
}
}

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import java.util.Optional;
/**
* Record task execution time indicator,
* and send alarm notification when the execution time exceeds the threshold.
*
* @author huangchengxing
*/
@AllArgsConstructor
public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ExtensibleThreadPoolExecutor threadPoolExecutor;
/**
* Check whether the task execution time exceeds {@link #executeTimeOut},
* if it exceeds this time, send an alarm notification.
*
* @param executeTime executeTime in nanosecond
*/
@Override
public void afterRefreshTime(long executeTime) {
if (executeTime <= executeTimeOut) {
return;
}
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolExecutor.getThreadPoolId(), executeTime, executeTimeOut, threadPoolExecutor));
}
}

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()},
* if necessary, cancel the remaining tasks in the pool,
* and wait for the thread pool to terminate until
* the blocked main thread has timed out or the thread pool has completely terminated.
*
* @author huangchengxing
*/
@Accessors(chain = true)
@Getter
@Slf4j
@AllArgsConstructor
public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* await termination millis
*/
@Setter
public long awaitTerminationMillis;
/**
* wait for tasks to complete on shutdown
*/
@Setter
public boolean waitForTasksToCompleteOnShutdown;
/**
* Callback before pool shutdown.
*
* @param executor executor
*/
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor;
String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId();
if (log.isInfoEnabled()) {
log.info("Before shutting down ExecutorService" + (threadPoolId != null ? " '" + threadPoolId + "'" : ""));
}
}
}
/**
* Callback after pool shutdown.
* if {@link #waitForTasksToCompleteOnShutdown} return {@code true}
* cancel the remaining tasks,
* then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
*
* @param executor executor
* @param remainingTasks remainingTasks
*/
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor;
if (!waitForTasksToCompleteOnShutdown && CollectionUtil.isNotEmpty(remainingTasks)) {
remainingTasks.forEach(this::cancelRemainingTask);
}
awaitTerminationIfNecessary(pool);
}
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addItem("awaitTerminationMillis", awaitTerminationMillis)
.addItem("waitForTasksToCompleteOnShutdown", waitForTasksToCompleteOnShutdown);
}
/**
* Cancel the given remaining task which never commended execution,
* as returned from {@link ExecutorService#shutdownNow()}.
*
* @param task the task to cancel (typically a {@link RunnableFuture})
* @see RunnableFuture#cancel(boolean)
* @since 5.0.5
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
/**
* Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}.
*/
private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) {
String threadPoolId = executor.getThreadPoolId();
if (this.awaitTerminationMillis <= 0) {
return;
}
try {
boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
if (!isTerminated && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor" + " '" + threadPoolId + "'" + " to terminate.");
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor" + " '" + threadPoolId + "'" + " to terminate.");
}
Thread.currentThread().interrupt();
}
}
}

@ -0,0 +1,9 @@
package cn.hippo4j.core.executor;
/**
* test for {@link DynamicThreadPoolExecutor}
*
* @author huangchengxing
*/
public class DynamicThreadPoolExecutorTest {
}

@ -0,0 +1,167 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.plugin.*;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ExtensibleThreadPoolExecutor}
*
* @author huangchengxing
*/
public class ExtensibleThreadPoolExecutorTest {
private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
private ExtensibleThreadPoolExecutor executor;
@Before
public void initExecutor() {
executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, originalHandler
);
}
@Test
public void testGetOrSetRejectedHandler() {
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor.setRejectedExecutionHandler(handler);
Assert.assertSame(handler, executor.getRejectedExecutionHandler());
}
@Test
public void testInvokeTaskAwarePlugin() {
TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
executor.register(plugin);
executor.submit(() -> {});
executor.submit(() -> true);
executor.submit(() -> {}, false);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeExecuteAwarePlugin() {
TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
executor.register(plugin);
executor.execute(() -> {});
ThreadUtil.sleep(500L);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeRejectedAwarePlugin() {
executor.setCorePoolSize(1);
executor.setMaximumPoolSize(1);
TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
executor.register(plugin);
// blocking pool and queue
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
// reject 3 tasks
executor.submit(() -> {});
executor.submit(() -> {});
executor.submit(() -> {});
ThreadUtil.sleep(500L);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdown();
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdownNow();
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestTaskAwarePlugin";
@Override
public <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
}
@Override
public <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
}
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestExecuteAwarePlugin";
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
}
@Override
public Runnable execute(Runnable runnable) {
invokeCount.incrementAndGet();
return ExecuteAwarePlugin.super.execute(runnable);
}
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
}
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestRejectedAwarePlugin";
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
}
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestShutdownAwarePlugin";
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.beforeShutdown(executor);
}
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
}
@Override
public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterTerminated(executor);
}
}
}

@ -0,0 +1,108 @@
package cn.hippo4j.core.plugin;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* test for {@link DefaultThreadPoolPluginRegistry}
*
* @author huangchengxing
*/
public class DefaultThreadPoolPluginRegistryTest {
private DefaultThreadPoolPluginRegistry registry;
@Before
public void initRegistry() {
registry = new DefaultThreadPoolPluginRegistry();
}
@Test
public void testRegister() {
TaskAwarePlugin taskAwarePlugin = new TestTaskAwarePlugin();
registry.register(taskAwarePlugin);
Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin));
Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId()));
Assert.assertEquals(1, registry.getTaskAwareList().size());
Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId()));
registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, plugin -> Assert.assertSame(plugin, taskAwarePlugin));
Assert.assertEquals(taskAwarePlugin.getId(), registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, TestTaskAwarePlugin::getId, null));
registry.unregister(taskAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(taskAwarePlugin.getId()));
ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin();
registry.register(executeAwarePlugin);
Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId()));
Assert.assertEquals(1, registry.getExecuteAwareList().size());
Assert.assertSame(executeAwarePlugin, registry.getPlugin(executeAwarePlugin.getId()));
registry.unregister(executeAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(executeAwarePlugin.getId()));
RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin();
registry.register(rejectedAwarePlugin);
Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId()));
Assert.assertEquals(1, registry.getRejectedAwareList().size());
Assert.assertSame(rejectedAwarePlugin, registry.getPlugin(rejectedAwarePlugin.getId()));
registry.unregister(rejectedAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(rejectedAwarePlugin.getId()));
ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin();
registry.register(shutdownAwarePlugin);
Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId()));
Assert.assertEquals(1, registry.getShutdownAwareList().size());
Assert.assertSame(shutdownAwarePlugin, registry.getPlugin(shutdownAwarePlugin.getId()));
registry.unregister(shutdownAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(shutdownAwarePlugin.getId()));
}
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return "TestTaskAwarePlugin";
}
}
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return "TestExecuteAwarePlugin";
}
}
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return "TestRejectedAwarePlugin";
}
}
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return "TestShutdownAwarePlugin";
}
}
}

@ -0,0 +1,59 @@
package cn.hippo4j.core.plugin;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* test {@link ThreadPoolPlugin}'s info to json
*
* @author huangchengxing
*/
public class PluginRuntimeTest {
private final ObjectMapper objectMapper = new ObjectMapper();
@SneakyThrows
@Test
public void testGetPluginRuntime() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
// TaskRejectCountRecordPlugin
TaskRejectCountRecordPlugin taskRejectCountRecordPlugin = new TaskRejectCountRecordPlugin();
executor.register(taskRejectCountRecordPlugin);
// TaskRejectCountRecordPlugin
TaskTimeRecordPlugin taskTimeRecordPlugin = new TaskTimeRecordPlugin();
executor.register(taskTimeRecordPlugin);
// ThreadPoolExecutorShutdownPlugin
ThreadPoolExecutorShutdownPlugin executorShutdownPlugin = new ThreadPoolExecutorShutdownPlugin(2000L, true);
executor.register(executorShutdownPlugin);
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(300L));
executor.submit(() -> ThreadUtil.sleep(200L));
ThreadUtil.sleep(1000L);
List<PluginRuntime> runtimeList = executor.getAllPlugins().stream()
.map(ThreadPoolPlugin::getPluginRuntime)
.collect(Collectors.toList());
System.out.println(objectMapper.writeValueAsString(runtimeList));
}
}

@ -0,0 +1,45 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link TaskDecoratorPlugin}
*
* @author huangchengxing
*/
public class TaskDecoratorPluginTest {
private final AtomicInteger taskExecuteCount = new AtomicInteger(0);
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
plugin.addDecorator(runnable -> () -> {
taskExecuteCount.incrementAndGet();
runnable.run();
});
plugin.addDecorator(runnable -> () -> {
taskExecuteCount.incrementAndGet();
runnable.run();
});
executor.register(plugin);
executor.execute(() -> {});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, taskExecuteCount.get());
}
}

@ -0,0 +1,38 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskRejectCountRecordPlugin}
*
* @author huangchengxing
*/
public class TaskRejectCountRecordPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
ThreadUtil.sleep(500L);
Assert.assertEquals((Long)1L, plugin.getRejectCountNum());
}
}

@ -0,0 +1,9 @@
package cn.hippo4j.core.plugin.impl;
/**
* test for {@link TaskRejectNotifyAlarmPlugin}
*
* @author huangchengxing
*/
public class TaskRejectNotifyAlarmPluginTest {
}

@ -0,0 +1,42 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.SyncTimeRecorder;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskTimeRecordPlugin}
*
* @author huangchengxing
*/
public class TaskTimeRecordPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin();
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(300L));
executor.submit(() -> ThreadUtil.sleep(200L));
ThreadUtil.sleep(1000L);
SyncTimeRecorder.Summary summary = plugin.summarize();
Assert.assertEquals(1, summary.getMinTaskTime() / 100L);
Assert.assertEquals(3, summary.getMaxTaskTime() / 100L);
Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 100L);
Assert.assertEquals(6, summary.getTotalTaskTime() / 100L);
}
}

@ -0,0 +1,71 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ThreadPoolExecutorShutdownPlugin}
*
* @author huangchengxing
*/
public class ThreadPoolExecutorShutdownPluginTest {
public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
2, 2, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
executor.register(plugin);
return executor;
}
private static Callable<Integer> getCallable(AtomicInteger completedCount) {
return () -> {
ThreadUtil.sleep(1000L);
return completedCount.incrementAndGet();
};
}
@Test
public void testExecuteShutdownWhenWaitTaskCompleted() {
ExtensibleThreadPoolExecutor executor = getExecutor(
new ThreadPoolExecutorShutdownPlugin(2000L, true)
);
AtomicInteger completedCount = new AtomicInteger(0);
Callable<Integer> future1 = getCallable(completedCount);
Callable<Integer> future2 = getCallable(completedCount);
executor.submit(future1);
executor.submit(future2);
executor.shutdown();
Assert.assertEquals(2, completedCount.get());
}
@Test
public void testExecuteShutdownWhenNotWaitTaskCompleted() {
ExtensibleThreadPoolExecutor executor = getExecutor(
new ThreadPoolExecutorShutdownPlugin(-1L, true)
);
AtomicInteger completedCount = new AtomicInteger(0);
Callable<Integer> future1 = getCallable(completedCount);
Callable<Integer> future2 = getCallable(completedCount);
executor.submit(future1);
executor.submit(future2);
executor.shutdown();
Assert.assertEquals(0, completedCount.get());
}
}

@ -0,0 +1,9 @@
package cn.hippo4j.core.plugin.impl;
/**
* test for {@link TaskTimeoutNotifyAlarmPlugin}
*
* @author huangchengxing
*/
public class TimeoutNotifyAlarmTaskTimeRecordPluginTest {
}

@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@ -43,7 +41,6 @@ import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -182,6 +179,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
// FIXME Compare using Objects.equals
if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm())
|| (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm())
|| (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) {
@ -249,18 +247,12 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
}
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {

@ -123,8 +123,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor();
TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor);
}
@ -155,7 +155,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.setBlockingQueue(queueType)
.setExecuteTimeOut(10000L)
.setQueueCapacity(queueCapacity)
.setRejectedHandler(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName())
.setRejectedHandler(executor.getRejectedExecutionHandler().getClass().getSimpleName())
.setThreadPoolId(threadPoolId);
return executorProperties;
}

@ -17,20 +17,18 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -39,7 +37,6 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -71,9 +68,9 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
boolean originalAllowCoreThreadTimeOut = executor.allowsCoreThreadTimeOut();
Long originalExecuteTimeOut = null;
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
rejectedExecutionHandler = dynamicExecutor.getRedundancyHandler();
rejectedExecutionHandler = dynamicExecutor.getRejectedExecutionHandler();
originalExecuteTimeOut = dynamicExecutor.getExecuteTimeOut();
}
changePoolInfo(executor, parameter);
@ -140,17 +137,11 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
}
Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L);
if (executeTimeOut != null && executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut);
}
if (parameter.getRejectedType() != null) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(parameter.getRejectedType());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, parameter.getTpId(), rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (parameter.getAllowCoreThreadTimeOut() != null) {

@ -33,7 +33,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
@ -45,14 +46,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.util.ClassUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -153,7 +152,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build();
// Set dynamic thread pool enhancement parameters.
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getLivenessAlarm(),
@ -161,8 +160,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut());
@ -184,7 +183,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.isAlarm(false)
.activeAlarm(80)
.capacityAlarm(80)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName()))
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo)

Loading…
Cancel
Save