refactor the function extension logic of DynamicThreadPoolExecutor (#815) (#854)

* feat: Add a new thread-pool that supports the registration of callback interfaces

* feat: Add plugins to support the default extensions of DynamicThreadPoolExecutor

* refactor: Deprecate AbstractDynamicExecutorSupport and make DynamicThreadPoolExecutor extend ExtensibleThreadPoolExecutor (#815)

* fix: obtained plugin list may cause thread-safe problems during iteration

* refactor: Make DynamicThreadPoolExecutor support the selection of shutdown mode in destroy

Co-authored-by: 马称 Ma Chen <machen@apache.org>
pull/876/head
黄成兴 2 years ago committed by GitHub
parent 028bcd82f4
commit 9165200fe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,107 +17,226 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.toolkit.SystemClock;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.*;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Enhanced dynamic and monitored thread pool.
*/
public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* wait for tasks to complete on shutdown
*/
@Getter
@Setter
private Long executeTimeOut;
@Getter
@Setter
private TaskDecorator taskDecorator;
@Getter
@Setter
private RejectedExecutionHandler redundancyHandler;
@Getter
private final String threadPoolId;
@Getter
private final AtomicLong rejectCount = new AtomicLong();
private final ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>();
public DynamicThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
long executeTimeOut,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
public boolean waitForTasksToCompleteOnShutdown;
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler);
this.threadPoolId = threadPoolId;
this.executeTimeOut = executeTimeOut;
// Number of dynamic proxy denial policies.
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
setRejectedExecutionHandler(rejectedProxy);
// Redundant fields to avoid reflecting the acquired fields when sending change information.
redundancyHandler = rejectedExecutionHandler;
super(
threadPoolId, new DefaultThreadPoolPluginManager(),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
// init default plugins
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this);
}
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
*/
@Override
public void execute(@NonNull Runnable command) {
if (taskDecorator != null) {
command = taskDecorator.decorate(command);
public void destroy() {
if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
super.execute(command);
getThreadPoolPluginManager().clear();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (executeTimeOut == null || executeTimeOut <= 0) {
return;
}
startTimeThreadLocal.set(SystemClock.now());
/**
* Get await termination millis.
*
* @return await termination millis.
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public long getAwaitTerminationMillis() {
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis)
.orElse(-1L);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Long startTime;
if ((startTime = startTimeThreadLocal.get()) == null) {
return;
}
try {
long endTime = SystemClock.now();
long executeTime;
boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
if (notifyAlarmHandler != null) {
notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
}
}
} finally {
startTimeThreadLocal.remove();
}
/**
* Set support param.
*
* @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.ifPresent(processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis));
}
@Override
protected ExecutorService initializeExecutor() {
return this;
/**
* Get reject count num.
*
* @return reject count num
* @deprecated use {@link TaskRejectCountRecordPlugin}
*/
@Deprecated
public Long getRejectCountNum() {
return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCountNum)
.orElse(-1L);
}
public Long getRejectCountNum() {
return rejectCount.get();
/**
* Get reject count.
*
* @return reject count num
* @deprecated use {@link TaskRejectCountRecordPlugin}
*/
@Deprecated
public AtomicLong getRejectCount() {
return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCount)
.orElse(new AtomicLong(0));
}
/**
* Get execute time out.
*
* @deprecated use {@link TaskTimeoutNotifyAlarmPlugin}
*/
@Deprecated
public Long getExecuteTimeOut() {
return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut)
.orElse(-1L);
}
/**
* Set execute time out.
*
* @param executeTimeOut execute time out
* @deprecated use {@link TaskTimeoutNotifyAlarmPlugin}
*/
@Deprecated
public void setExecuteTimeOut(Long executeTimeOut) {
getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut));
}
/**
* Get {@link TaskDecorator}.
*
* @deprecated use {@link TaskDecoratorPlugin}
*/
@Deprecated
public TaskDecorator getTaskDecorator() {
return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.map(processor -> CollectionUtil.getFirst(processor.getDecorators()))
.orElse(null);
}
/**
* Set {@link TaskDecorator}.
*
* @param taskDecorator task decorator
* @deprecated use {@link TaskDecoratorPlugin}
*/
@Deprecated
public void setTaskDecorator(TaskDecorator taskDecorator) {
getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.ifPresent(processor -> {
if (Objects.nonNull(taskDecorator)) {
processor.clearDecorators();
processor.addDecorator(taskDecorator);
}
});
}
/**
* Get rejected execution handler.
*
* @deprecated use {@link DynamicThreadPoolExecutor#getRejectedExecutionHandler}
*/
@Deprecated
public RejectedExecutionHandler getRedundancyHandler() {
return getRejectedExecutionHandler();
}
/**
* Set rejected execution handler.
*
* @param handler handler
* @deprecated use {@link DynamicThreadPoolExecutor#setRejectedExecutionHandler}
*/
@Deprecated
public void setRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
}

@ -17,7 +17,6 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -68,8 +67,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override
public void destroy() throws Exception {
if (executor instanceof AbstractDynamicExecutorSupport) {
((AbstractDynamicExecutorSupport) executor).destroy();
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).destroy();
}
}
}

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.core.plugin.*;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
/**
* <p>Extensible thread-pool executor. <br />
* Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}.
* Each extension point corresponds to a different {@link ThreadPoolPlugin} interface,
* users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface
* to enable plugins to sense thread pool behavior and provide extended functions.
*
* @see ThreadPoolPluginManager
* @see ThreadPoolPlugin
*/
public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport {
/**
* thread pool id
*/
@Getter
@NonNull
private final String threadPoolId;
/**
* action aware registry
*/
@Getter
private final ThreadPoolPluginManager threadPoolPluginManager;
/**
* handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
*/
private final RejectedAwareHandlerWrapper handlerWrapper;
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ExtensibleThreadPoolExecutor(
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// pool extended info
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
// proxy handler to support Aware callback
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler);
super.setRejectedExecutionHandler(handlerWrapper);
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
* @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first.
*
* @param runnable the task to execute
*/
@Override
public void execute(@NonNull Runnable runnable) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskExecute(runnable);
}
super.execute(runnable);
}
/**
* {@inheritDoc}
*
* <p><b>After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
* @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
* execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method,
* {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
* and then will be call {@link ShutdownAwarePlugin#afterShutdown}
*
* @throws SecurityException {@inheritDoc}
*/
@Override
public void shutdown() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
super.shutdown();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList()));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method,
* {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
* and then will be call {@link ShutdownAwarePlugin#afterShutdown}
*
* @throws SecurityException
*/
@Override
public List<Runnable> shutdownNow() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
List<Runnable> tasks = super.shutdownNow();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks));
return tasks;
}
/**
* {@inheritDoc}.
*
* <p><b>Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@Override
protected void terminated() {
super.terminated();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value);
}
return super.newTaskFor(runnable, value);
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
*
* @param callable the callable task being wrapped
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
callable = taskAwarePlugin.beforeTaskCreate(this, callable);
}
return super.newTaskFor(callable);
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
@Override
public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
handlerWrapper.setHandler(handler);
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handlerWrapper.getHandler();
}
/**
* Get thread-pool executor.
*
* @return thread-pool executor
*/
@Override
public ThreadPoolExecutor getThreadPoolExecutor() {
return this;
}
/**
* Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor},
* It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}.
*
* @see RejectedAwarePlugin
*/
@AllArgsConstructor
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
* thread-pool action aware registry
*/
private final ThreadPoolPluginManager registry;
/**
* original target
*/
@NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Collection<RejectedAwarePlugin> rejectedAwarePluginList = registry.getRejectedAwarePluginList();
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
}
}

@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L;

@ -23,10 +23,8 @@ import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.common.toolkit.MemoryUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -66,11 +64,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();
String rejectedName;
if (pool instanceof AbstractDynamicExecutorSupport) {
rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName();
} else {
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
}
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
poolRunStateInfo.setRejectedName(rejectedName);
ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class);
manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE);

@ -17,6 +17,7 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@ -25,7 +26,10 @@ import java.util.concurrent.*;
/**
* Dynamic executor configuration support.
*
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin} to get thread-pool shutdown support
*/
@Deprecated
@Slf4j
public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean {

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
/**
* Callback during task execution.
*/
public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task execution.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
default void beforeExecute(Thread thread, Runnable runnable) {
}
/**
* Callback after task execution.
*
* @param runnable runnable
* @param throwable exception thrown during execution
* @see ExtensibleThreadPoolExecutor#afterExecute
*/
default void afterExecute(Runnable runnable, Throwable throwable) {
// do nothing
}
}

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* Plug in runtime information.
*/
@RequiredArgsConstructor
@Getter
public class PluginRuntime {
/**
* plugin id
*/
private final String pluginId;
/**
* runtime info
*/
private final List<Info> infoList = new ArrayList<>();
/**
* Add a runtime info item.
*
* @param name name
* @param value value
* @return runtime info item
*/
public PluginRuntime addInfo(String name, Object value) {
infoList.add(new Info(name, value));
return this;
}
@Getter
@RequiredArgsConstructor
public static class Info {
private final String name;
private final Object value;
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback when task is rejected.
*/
public interface RejectedAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
// do nothing
}
}

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback before thread-pool shutdown.
*/
public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before pool shutdown.
*
* @param executor executor
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void beforeShutdown(ThreadPoolExecutor executor) {
// do nothing
}
/**
* Callback after pool shutdown.
*
* @param executor executor
* @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
// do nothing
}
/**
* Callback after pool terminated.
*
* @param executor executor
* @see ThreadPoolExecutor#terminated()
*/
default void afterTerminated(ExtensibleThreadPoolExecutor executor) {
// do nothing
}
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback during task submit in thread-pool.
*/
public interface TaskAwarePlugin extends ThreadPoolPlugin {
/**
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param runnable original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Runnable, Object)
*/
default <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
return runnable;
}
/**
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param future original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Callable)
*/
default <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
return future;
}
/**
* Callback when task is execute.
*
* @param runnable runnable
* @return tasks to be execute
* @see ExtensibleThreadPoolExecutor#execute
*/
default Runnable beforeTaskExecute(Runnable runnable) {
return runnable;
}
}

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
/**
* <p>A marker superinterface indicating that
* an instance class is eligible to be sense and intercept
* some operations of the specific thread-pool instance.
*
* <p>Generally, any thread-pool that implements the {@link ThreadPoolPluginSupport}
* can be register multiple plugins by {@link ThreadPoolPluginSupport#register},
* and the plugin will provide some extension function of original
* {@link java.util.concurrent.ThreadPoolExecutor} does not support.
*
* <p>During runtime, plugins can dynamically modify some configurable parameters
* and provide some runtime information by {@link #getPluginRuntime()}.
* When the thread-pool is destroyed, the plugin will also be destroyed.
*
* @see ExtensibleThreadPoolExecutor
* @see ThreadPoolPluginManager
* @see TaskAwarePlugin
* @see ExecuteAwarePlugin
* @see ShutdownAwarePlugin
* @see RejectedAwarePlugin
*/
public interface ThreadPoolPlugin {
/**
* Get id.
*
* @return id
*/
String getId();
/**
* Callback when plugin register into manager
*
* @see ThreadPoolPluginManager#register
*/
default void start() {
// do nothing
}
/**
* Callback when plugin unregister from manager
*
* @see ThreadPoolPluginManager#unregister
* @see ThreadPoolPluginManager#clear
*/
default void stop() {
// do nothing
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
default PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId());
}
}

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import lombok.Getter;
import lombok.NonNull;
import org.springframework.core.task.TaskDecorator;
import java.util.ArrayList;
import java.util.List;
/**
* Decorate tasks when they are submitted to thread-pool.
*/
public class TaskDecoratorPlugin implements TaskAwarePlugin {
public static final String PLUGIN_NAME = "task-decorator-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* decorators
*/
@Getter
private final List<TaskDecorator> decorators = new ArrayList<>();
/**
* Callback when task is executed.
*
* @param runnable runnable
* @return tasks to be execute
* @see ExtensibleThreadPoolExecutor#execute
*/
@Override
public Runnable beforeTaskExecute(Runnable runnable) {
for (TaskDecorator decorator : decorators) {
runnable = decorator.decorate(runnable);
}
return runnable;
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("decorators", decorators);
}
/**
* Add a decorator
*
* @param decorator decorator
*/
public void addDecorator(@NonNull TaskDecorator decorator) {
decorators.remove(decorator);
decorators.add(decorator);
}
/**
* Clear all decorators
*
*/
public void clearDecorators() {
decorators.clear();
}
/**
* Remove decorators
*
*/
public void removeDecorator(TaskDecorator decorator) {
decorators.remove(decorator);
}
}

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/**
* Record the number of tasks rejected by the thread pool.
*/
public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = "task-reject-count-record-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* rejection count
*/
@Setter
@Getter
private AtomicLong rejectCount = new AtomicLong(0);
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("rejectCount", getRejectCountNum());
}
/**
* Record rejection count.
*
* @param r task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectCount.incrementAndGet();
}
/**
* Get reject count num
*
* @return reject count num
*/
public Long getRejectCountNum() {
return rejectCount.get();
}
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Send alert notification when a task is rejected.
*/
public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (!(executor instanceof ExtensibleThreadPoolExecutor)) {
return;
}
String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId();
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId));
}
}

@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Record task execution time indicator.
*
* @see TaskTimeoutNotifyAlarmPlugin
*/
@RequiredArgsConstructor
public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
* Lock instance.
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks.
*/
private long totalTaskTimeMillis = 0L;
/**
* Maximum task milli execution time, default -1.
*/
private long maxTaskTimeMillis = -1L;
/**
* Minimal task milli execution time, default -1.
*/
private long minTaskTimeMillis = -1L;
/**
* Count of completed task.
*/
private long taskCount = 0L;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(SystemClock.now());
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
Summary summary = summarize();
return new PluginRuntime(getId())
.addInfo("taskCount", summary.getTaskCount())
.addInfo("minTaskTime", summary.getMinTaskTimeMillis() + "ms")
.addInfo("maxTaskTime", summary.getMaxTaskTimeMillis() + "ms")
.addInfo("totalTaskTime", summary.getTotalTaskTimeMillis() + "ms")
.addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms");
}
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
try {
Long startTime = startTimes.get();
if (Objects.isNull(startTime)) {
return;
}
long executeTime = SystemClock.now() - startTime;
recordTaskTime(executeTime);
} finally {
startTimes.remove();
}
}
/**
* Refresh time indicators of the current instance.
*
* @param taskExecutionTime millisecond
*/
protected void recordTaskTime(long taskExecutionTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount == 0) {
maxTaskTimeMillis = taskExecutionTime;
minTaskTimeMillis = taskExecutionTime;
} else {
maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis);
}
taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecutionTime;
} finally {
writeLock.unlock();
}
}
/**
* Get the summary statistics of the instance at the current time.
*
* @return data snapshot
*/
public Summary summarize() {
Lock readLock = lock.readLock();
Summary statistics;
readLock.lock();
try {
statistics = new Summary(
this.totalTaskTimeMillis,
this.maxTaskTimeMillis,
this.minTaskTimeMillis,
this.taskCount);
} finally {
readLock.unlock();
}
return statistics;
}
/**
* Summary statistics of SyncTimeRecorder instance at a certain time.
*/
@Getter
@RequiredArgsConstructor
public static class Summary {
/**
* Total execution nano time of all tasks.
*/
private final long totalTaskTimeMillis;
/**
* Maximum task nano execution time.
*/
private final long maxTaskTimeMillis;
/**
* Minimal task nano execution time.
*/
private final long minTaskTimeMillis;
/**
* Count of completed task.
*/
private final long taskCount;
/**
* Get the avg task time in milliseconds.
*
* @return avg task time
*/
public long getAvgTaskTimeMillis() {
long totalTaskCount = getTaskCount();
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
}
}
}

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Record task execution time indicator,
* and send alarm notification when the execution time exceeds the threshold.
*/
@AllArgsConstructor
public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
/**
* threadPoolId
*/
private final String threadPoolId;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ThreadPoolExecutor threadPoolExecutor;
/**
* Check whether the task execution time exceeds {@link #executeTimeOut},
* if it exceeds this time, send an alarm notification.
*
* @param executeTime executeTime in nanosecond
*/
@Override
protected void recordTaskTime(long executeTime) {
super.recordTaskTime(executeTime);
if (executeTime <= executeTimeOut) {
return;
}
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, executeTime, executeTimeOut, threadPoolExecutor));
}
}

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* <p>After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()}. <br />
* Cancel the remaining tasks in the pool, then wait for the thread pool to terminate until
* the blocked main thread has timed out or the thread pool has completely terminated.
*/
@Accessors(chain = true)
@Getter
@Slf4j
@AllArgsConstructor
public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* await termination millis
*/
@Setter
public long awaitTerminationMillis;
/**
* Callback before pool shutdown.
*
* @param executor executor
*/
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor;
String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId();
if (log.isInfoEnabled()) {
log.info("Before shutting down ExecutorService {}", threadPoolId);
}
}
}
/**
* Callback after pool shutdown. <br />
* cancel the remaining tasks,
* then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
*
* @param executor executor
* @param remainingTasks remainingTasks
*/
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor;
if (CollectionUtil.isNotEmpty(remainingTasks)) {
remainingTasks.forEach(this::cancelRemainingTask);
}
awaitTerminationIfNecessary(pool);
}
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("awaitTerminationMillis", awaitTerminationMillis);
}
/**
* Cancel the given remaining task which never commended execution,
* as returned from {@link ExecutorService#shutdownNow()}.
*
* @param task the task to cancel (typically a {@link RunnableFuture})
* @see RunnableFuture#cancel(boolean)
* @since 5.0.5
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
/**
* Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}.
*/
private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) {
String threadPoolId = executor.getThreadPoolId();
if (this.awaitTerminationMillis <= 0) {
return;
}
try {
boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
if (!isTerminated && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor {} to terminate.", threadPoolId);
} else {
log.info("ExecutorService {} has been shutdowned.", threadPoolId);
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor {} to terminate.", threadPoolId);
}
Thread.currentThread().interrupt();
}
}
}

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.*;
import lombok.NonNull;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* <p>The default implementation of {@link ThreadPoolPluginManager}.
* Provide basic {@link ThreadPoolPlugin} registration, logout and acquisition functions.
* Most APIs ensure limited thread-safe.
*
* <p>Usually registered to {@link cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor},
* or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport}
* to support its plugin based extension functions.
*
* <p><b>NOTE:</b>
* When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable.
* This means that until actually start iterating over the list,
* registering or unregistering plugins through the manager will affect the results of the iteration.
* Therefore, we should try to ensure that <b>get the latest plugin list from the manager before each use</b>.
*
* @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor
* @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor
*/
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* lock of this instance
*/
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock();
/**
* Registered {@link ThreadPoolPlugin}.
*/
private final Map<String, ThreadPoolPlugin> registeredPlugins = new ConcurrentHashMap<>(16);
/**
* Registered {@link TaskAwarePlugin}.
*/
private final List<TaskAwarePlugin> taskAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link ExecuteAwarePlugin}.
*/
private final List<ExecuteAwarePlugin> executeAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link RejectedAwarePlugin}.
*/
private final List<RejectedAwarePlugin> rejectedAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link ShutdownAwarePlugin}.
*/
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Clear all.
*/
@Override
public synchronized void clear() {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
Collection<ThreadPoolPlugin> plugins = registeredPlugins.values();
registeredPlugins.clear();
taskAwarePluginList.clear();
executeAwarePluginList.clear();
rejectedAwarePluginList.clear();
shutdownAwarePluginList.clear();
plugins.forEach(ThreadPoolPlugin::stop);
} finally {
writeLock.unlock();
}
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
@Override
public void register(@NonNull ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered");
// register plugin
registeredPlugins.put(id, plugin);
// quick index
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin) plugin);
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.add((ExecuteAwarePlugin) plugin);
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.add((RejectedAwarePlugin) plugin);
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin);
}
plugin.start();
} finally {
writeLock.unlock();
}
}
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
@Override
public boolean tryRegister(ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
if (registeredPlugins.containsKey(plugin.getId())) {
return false;
}
register(plugin);
return true;
} finally {
writeLock.unlock();
}
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
Optional.ofNullable(pluginId)
.map(registeredPlugins::remove)
.ifPresent(plugin -> {
// remove quick index if necessary
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.remove(plugin);
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.remove(plugin);
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.remove(plugin);
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.remove(plugin);
}
plugin.stop();
});
} finally {
writeLock.unlock();
}
}
/**
* Get all registered plugins.
*
* @return plugins
* @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager
*/
@Override
public Collection<ThreadPoolPlugin> getAllPlugins() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return registeredPlugins.values();
} finally {
readLock.unlock();
}
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
@Override
public boolean isRegistered(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return registeredPlugins.containsKey(pluginId);
} finally {
readLock.unlock();
}
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @param <A> plugin type
* @return {@link ThreadPoolPlugin}, null if unregister
*/
@Override
@SuppressWarnings("unchecked")
public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return (Optional<A>) Optional.ofNullable(registeredPlugins.get(pluginId));
} finally {
readLock.unlock();
}
}
/**
* Get execute plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return executeAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get rejected plugin list.
*
* @return {@link RejectedAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager
*/
@Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return rejectedAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get shutdown plugin list.
*
* @return {@link ShutdownAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager
*/
@Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return shutdownAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get shutdown plugin list.
*
* @return {@link ShutdownAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager
*/
@Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return taskAwarePluginList;
} finally {
readLock.unlock();
}
}
}

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.impl.*;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
/**
* Register default {@link ThreadPoolPlugin}.
*
* @see TaskDecoratorPlugin
* @see TaskTimeoutNotifyAlarmPlugin
* @see TaskRejectCountRecordPlugin
* @see TaskRejectNotifyAlarmPlugin
* @see ThreadPoolExecutorShutdownPlugin
*/
@NoArgsConstructor
@AllArgsConstructor
public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar {
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* execute time out
*/
private long executeTimeOut;
/**
* await termination millis
*/
private long awaitTerminationMillis;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return REGISTRAR_NAME;
}
/**
* Create and register plugin for the specified thread-pool instance.
*
* @param support thread pool plugin manager delegate
*/
@Override
public void doRegister(ThreadPoolPluginSupport support) {
// callback when task execute
support.register(new TaskDecoratorPlugin());
support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor()));
// callback when task rejected
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
// callback when pool shutdown
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
}
}

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
/**
* Empty thread pool plugin manager.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* default instance
*/
public static final EmptyThreadPoolPluginManager INSTANCE = new EmptyThreadPoolPluginManager();
/**
* Clear all.
*/
@Override
public void clear() {
// do nothing
}
/**
* Get all registered plugins.
*
* @return plugins
*/
@Override
public Collection<ThreadPoolPlugin> getAllPlugins() {
return Collections.emptyList();
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
* already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
@Override
public void register(ThreadPoolPlugin plugin) {
// do nothing
}
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
@Override
public boolean tryRegister(ThreadPoolPlugin plugin) {
return false;
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
@Override
public boolean isRegistered(String pluginId) {
return false;
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
// do nothing
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Override
public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
return Optional.empty();
}
/**
* Get execute aware plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
return Collections.emptyList();
}
/**
* Get rejected aware plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
return Collections.emptyList();
}
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
return Collections.emptyList();
}
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return Collections.emptyList();
}
}

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Manager of {@link ThreadPoolPlugin}.
* Bind with the specified thread-pool instance to register and manage plugins.
* when the thread pool is destroyed, please ensure that the manager will also be destroyed.
*
* @see DefaultThreadPoolPluginManager
*/
public interface ThreadPoolPluginManager {
/**
* Get an empty manager.
*
* @return {@link EmptyThreadPoolPluginManager}
*/
static ThreadPoolPluginManager empty() {
return EmptyThreadPoolPluginManager.INSTANCE;
}
/**
* Clear all.
*/
void clear();
/**
* Get all registered plugins.
*
* @return plugins
*/
Collection<ThreadPoolPlugin> getAllPlugins();
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
* already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
void register(ThreadPoolPlugin plugin);
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
boolean tryRegister(ThreadPoolPlugin plugin);
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
boolean isRegistered(String pluginId);
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
void unregister(String pluginId);
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @param <A> target aware type
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
<A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId);
/**
* Get execute aware plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
Collection<ExecuteAwarePlugin> getExecuteAwarePluginList();
/**
* Get rejected aware plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
Collection<RejectedAwarePlugin> getRejectedAwarePluginList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<ShutdownAwarePlugin> getShutdownAwarePluginList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<TaskAwarePlugin> getTaskAwarePluginList();
// ==================== default methods ====================
/**
* Get plugin of type.
*
* @param pluginId plugin id
* @param pluginType plugin type
* @return target plugin
*/
default <A extends ThreadPoolPlugin> Optional<A> getPluginOfType(String pluginId, Class<A> pluginType) {
return getPlugin(pluginId)
.filter(pluginType::isInstance)
.map(pluginType::cast);
}
/**
* Get all plugins of type.
*
* @param pluginType plugin type
* @return all plugins of type
*/
default <A extends ThreadPoolPlugin> Collection<A> getAllPluginsOfType(Class<A> pluginType) {
return getAllPlugins().stream()
.filter(pluginType::isInstance)
.map(pluginType::cast)
.collect(Collectors.toList());
}
/**
* Get {@link PluginRuntime} of all registered plugins.
*
* @return {@link PluginRuntime} of all registered plugins
*/
default Collection<PluginRuntime> getAllPluginRuntimes() {
return getAllPlugins().stream()
.map(ThreadPoolPlugin::getPluginRuntime)
.collect(Collectors.toList());
}
/**
* Get {@link PluginRuntime} of registered plugin.
*
* @return {@link PluginRuntime} of registered plugin
*/
default Optional<PluginRuntime> getRuntime(String pluginId) {
return getPlugin(pluginId)
.map(ThreadPoolPlugin::getPluginRuntime);
}
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
/**
* Registrar of {@link ThreadPoolPlugin}.
*/
public interface ThreadPoolPluginRegistrar {
/**
* Get id.
* In spring container, the obtained id will be used as the alias of the bean name.
*
* @return id
*/
String getId();
/**
* Create and register plugin for the specified thread-pool instance.
*
* @param support thread pool plugin manager delegate
*/
void doRegister(ThreadPoolPluginSupport support);
}

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Used to support the binding of {@link ThreadPoolPluginManager} and {@link ThreadPoolExecutor}.
*/
public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
/**
* Get thread pool action aware registry.
*
* @return {@link ThreadPoolPluginManager}
*/
@NonNull
ThreadPoolPluginManager getThreadPoolPluginManager();
/**
* Get thread-pool id
*
* @return thread-pool id
*/
String getThreadPoolId();
/**
* Get thread-pool executor.
*
* @return thread-pool executor
*/
ThreadPoolExecutor getThreadPoolExecutor();
// ======================== delegate methods ========================
/**
* Clear all.
*/
@Override
default void clear() {
getThreadPoolPluginManager().clear();
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin aware
*/
@Override
default void register(ThreadPoolPlugin plugin) {
getThreadPoolPluginManager().register(plugin);
}
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
@Override
default boolean tryRegister(ThreadPoolPlugin plugin) {
return getThreadPoolPluginManager().tryRegister(plugin);
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId name
* @return ture if target has been registered, false otherwise
*/
@Override
default boolean isRegistered(String pluginId) {
return getThreadPoolPluginManager().isRegistered(pluginId);
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId name
*/
@Override
default void unregister(String pluginId) {
getThreadPoolPluginManager().unregister(pluginId);
}
/**
* Get all registered plugins.
*
* @return plugins
*/
@Override
default Collection<ThreadPoolPlugin> getAllPlugins() {
return getThreadPoolPluginManager().getAllPlugins();
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId target name
* @return {@link ThreadPoolPlugin}, null if unregister
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Override
default <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
return getThreadPoolPluginManager().getPlugin(pluginId);
}
/**
* Get execute aware list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
default Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
return getThreadPoolPluginManager().getExecuteAwarePluginList();
}
/**
* Get rejected aware list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
default Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
return getThreadPoolPluginManager().getRejectedAwarePluginList();
}
/**
* Get shutdown aware list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
return getThreadPoolPluginManager().getShutdownAwarePluginList();
}
/**
* Get shutdown aware list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return getThreadPoolPluginManager().getTaskAwarePluginList();
}
}

@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@ -43,7 +41,6 @@ import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -190,6 +187,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
// FIXME Compare using Objects.equals
if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm())
|| (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm())
|| (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) {
@ -257,18 +255,12 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
}
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {

@ -123,8 +123,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor();
TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor);
}
@ -155,7 +155,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.setBlockingQueue(queueType)
.setExecuteTimeOut(10000L)
.setQueueCapacity(queueCapacity)
.setRejectedHandler(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName())
.setRejectedHandler(executor.getRejectedExecutionHandler().getClass().getSimpleName())
.setThreadPoolId(threadPoolId);
return executorProperties;
}

@ -17,20 +17,18 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -39,7 +37,6 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -71,9 +68,9 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
boolean originalAllowCoreThreadTimeOut = executor.allowsCoreThreadTimeOut();
Long originalExecuteTimeOut = null;
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
rejectedExecutionHandler = dynamicExecutor.getRedundancyHandler();
rejectedExecutionHandler = dynamicExecutor.getRejectedExecutionHandler();
originalExecuteTimeOut = dynamicExecutor.getExecuteTimeOut();
}
changePoolInfo(executor, parameter);
@ -140,17 +137,11 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
}
Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L);
if (executeTimeOut != null && executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut);
}
if (parameter.getRejectedType() != null) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(parameter.getRejectedType());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, parameter.getTpId(), rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (parameter.getAllowCoreThreadTimeOut() != null) {

@ -33,7 +33,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
@ -150,7 +151,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build();
// Set dynamic thread pool enhancement parameters.
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getLivenessAlarm(),
@ -158,8 +159,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut());
@ -180,7 +181,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.isAlarm(false)
.activeAlarm(80)
.capacityAlarm(80)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName()))
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo)

Loading…
Cancel
Save