diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 2249fabc..71cc5bed 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -17,107 +17,226 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.core.proxy.RejectedProxyUtil; -import cn.hippo4j.core.toolkit.SystemClock; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin; +import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar; import lombok.Getter; import lombok.NonNull; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; import org.springframework.core.task.TaskDecorator; -import java.util.concurrent.*; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * Enhanced dynamic and monitored thread pool. */ -public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport { +@Slf4j +public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { + /** + * wait for tasks to complete on shutdown + */ @Getter @Setter - private Long executeTimeOut; - - @Getter - @Setter - private TaskDecorator taskDecorator; - - @Getter - @Setter - private RejectedExecutionHandler redundancyHandler; - - @Getter - private final String threadPoolId; - - @Getter - private final AtomicLong rejectCount = new AtomicLong(); - - private final ThreadLocal startTimeThreadLocal = new ThreadLocal<>(); - - public DynamicThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - long executeTimeOut, - boolean waitForTasksToCompleteOnShutdown, - long awaitTerminationMillis, + public boolean waitForTasksToCompleteOnShutdown; + + /** + * Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters. + * + * @param threadPoolId thread-pool id + * @param executeTimeOut execute time out + * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown + * @param awaitTerminationMillis await termination millis + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param blockingQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor creates a new thread + * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public DynamicThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis, @NonNull BlockingQueue blockingQueue, @NonNull String threadPoolId, @NonNull ThreadFactory threadFactory, @NonNull RejectedExecutionHandler rejectedExecutionHandler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler); - this.threadPoolId = threadPoolId; - this.executeTimeOut = executeTimeOut; - // Number of dynamic proxy denial policies. - RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); - setRejectedExecutionHandler(rejectedProxy); - // Redundant fields to avoid reflecting the acquired fields when sending change information. - redundancyHandler = rejectedExecutionHandler; + super( + threadPoolId, new DefaultThreadPoolPluginManager(), + corePoolSize, maximumPoolSize, keepAliveTime, unit, + blockingQueue, threadFactory, rejectedExecutionHandler); + log.info("Initializing ExecutorService {}", threadPoolId); + this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; + // init default plugins + new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) + .doRegister(this); } + /** + * Invoked by the containing {@code BeanFactory} on destruction of a bean. + * + */ @Override - public void execute(@NonNull Runnable command) { - if (taskDecorator != null) { - command = taskDecorator.decorate(command); + public void destroy() { + if (isWaitForTasksToCompleteOnShutdown()) { + super.shutdown(); + } else { + super.shutdownNow(); } - super.execute(command); + getThreadPoolPluginManager().clear(); } - @Override - protected void beforeExecute(Thread t, Runnable r) { - if (executeTimeOut == null || executeTimeOut <= 0) { - return; - } - startTimeThreadLocal.set(SystemClock.now()); + /** + * Get await termination millis. + * + * @return await termination millis. + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} + */ + @Deprecated + public long getAwaitTerminationMillis() { + return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis) + .orElse(-1L); } - @Override - protected void afterExecute(Runnable r, Throwable t) { - Long startTime; - if ((startTime = startTimeThreadLocal.get()) == null) { - return; - } - try { - long endTime = SystemClock.now(); - long executeTime; - boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut; - if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) { - ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class); - if (notifyAlarmHandler != null) { - notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this); - } - } - } finally { - startTimeThreadLocal.remove(); - } + /** + * Set support param. + * + * @param awaitTerminationMillis await termination millis + * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} + */ + @Deprecated + public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) { + setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown); + getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .ifPresent(processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis)); } - @Override - protected ExecutorService initializeExecutor() { - return this; + /** + * Get reject count num. + * + * @return reject count num + * @deprecated use {@link TaskRejectCountRecordPlugin} + */ + @Deprecated + public Long getRejectCountNum() { + return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class) + .map(TaskRejectCountRecordPlugin::getRejectCountNum) + .orElse(-1L); } - public Long getRejectCountNum() { - return rejectCount.get(); + /** + * Get reject count. + * + * @return reject count num + * @deprecated use {@link TaskRejectCountRecordPlugin} + */ + @Deprecated + public AtomicLong getRejectCount() { + return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class) + .map(TaskRejectCountRecordPlugin::getRejectCount) + .orElse(new AtomicLong(0)); + } + + /** + * Get execute time out. + * + * @deprecated use {@link TaskTimeoutNotifyAlarmPlugin} + */ + @Deprecated + public Long getExecuteTimeOut() { + return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut) + .orElse(-1L); + } + + /** + * Set execute time out. + * + * @param executeTimeOut execute time out + * @deprecated use {@link TaskTimeoutNotifyAlarmPlugin} + */ + @Deprecated + public void setExecuteTimeOut(Long executeTimeOut) { + getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut)); + } + + /** + * Get {@link TaskDecorator}. + * + * @deprecated use {@link TaskDecoratorPlugin} + */ + @Deprecated + public TaskDecorator getTaskDecorator() { + return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class) + .map(processor -> CollectionUtil.getFirst(processor.getDecorators())) + .orElse(null); + } + + /** + * Set {@link TaskDecorator}. + * + * @param taskDecorator task decorator + * @deprecated use {@link TaskDecoratorPlugin} + */ + @Deprecated + public void setTaskDecorator(TaskDecorator taskDecorator) { + getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class) + .ifPresent(processor -> { + if (Objects.nonNull(taskDecorator)) { + processor.clearDecorators(); + processor.addDecorator(taskDecorator); + } + }); + } + + /** + * Get rejected execution handler. + * + * @deprecated use {@link DynamicThreadPoolExecutor#getRejectedExecutionHandler} + */ + @Deprecated + public RejectedExecutionHandler getRedundancyHandler() { + return getRejectedExecutionHandler(); + } + + /** + * Set rejected execution handler. + * + * @param handler handler + * @deprecated use {@link DynamicThreadPoolExecutor#setRejectedExecutionHandler} + */ + @Deprecated + public void setRedundancyHandler(RejectedExecutionHandler handler) { + setRejectedExecutionHandler(handler); } + } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java index a99b0b37..e5dab547 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java @@ -17,7 +17,6 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; import lombok.AllArgsConstructor; import lombok.Builder; @@ -68,8 +67,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean { @Override public void destroy() throws Exception { - if (executor instanceof AbstractDynamicExecutorSupport) { - ((AbstractDynamicExecutorSupport) executor).destroy(); + if (executor instanceof DynamicThreadPoolExecutor) { + ((DynamicThreadPoolExecutor) executor).destroy(); } } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java new file mode 100644 index 00000000..4bd96b16 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.core.plugin.*; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +/** + *

Extensible thread-pool executor.
+ * Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}. + * Each extension point corresponds to a different {@link ThreadPoolPlugin} interface, + * users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface + * to enable plugins to sense thread pool behavior and provide extended functions. + * + * @see ThreadPoolPluginManager + * @see ThreadPoolPlugin + */ +public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport { + + /** + * thread pool id + */ + @Getter + @NonNull + private final String threadPoolId; + + /** + * action aware registry + */ + @Getter + private final ThreadPoolPluginManager threadPoolPluginManager; + + /** + * handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper + */ + private final RejectedAwareHandlerWrapper handlerWrapper; + + /** + * Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters. + * + * @param threadPoolId thread-pool id + * @param threadPoolPluginManager action aware registry + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor + * creates a new thread + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public ExtensibleThreadPoolExecutor( + @NonNull String threadPoolId, + @NonNull ThreadPoolPluginManager threadPoolPluginManager, + int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + @NonNull BlockingQueue workQueue, + @NonNull ThreadFactory threadFactory, + @NonNull RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + + // pool extended info + this.threadPoolId = threadPoolId; + this.threadPoolPluginManager = threadPoolPluginManager; + + // proxy handler to support Aware callback + while (handler instanceof RejectedAwareHandlerWrapper) { + handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); + } + this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler); + super.setRejectedExecutionHandler(handlerWrapper); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first. + * + * @param thread the thread that will run task {@code r} + * @param runnable the task that will be executed + */ + @Override + protected void beforeExecute(Thread thread, Runnable runnable) { + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); + executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable)); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first. + * + * @param runnable the task to execute + */ + @Override + public void execute(@NonNull Runnable runnable) { + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + runnable = taskAwarePlugin.beforeTaskExecute(runnable); + } + super.execute(runnable); + } + + /** + * {@inheritDoc} + * + *

After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last. + * + * @param runnable the runnable that has completed + * @param throwable the exception that caused termination, or null if + * execution completed normally + */ + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); + executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable)); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, + * {@link ShutdownAwarePlugin#beforeShutdown} will be called first. + * and then will be call {@link ShutdownAwarePlugin#afterShutdown} + * + * @throws SecurityException {@inheritDoc} + */ + @Override + public void shutdown() { + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); + shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); + super.shutdown(); + shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList())); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, + * {@link ShutdownAwarePlugin#beforeShutdown} will be called first. + * and then will be call {@link ShutdownAwarePlugin#afterShutdown} + * + * @throws SecurityException + */ + @Override + public List shutdownNow() { + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); + shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); + List tasks = super.shutdownNow(); + shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks)); + return tasks; + } + + /** + * {@inheritDoc}. + * + *

Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first. + */ + @Override + protected void terminated() { + super.terminated(); + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); + shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this)); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first. + * + * @param runnable the runnable task being wrapped + * @param value the default value for the returned future + * @return a {@code RunnableFuture} which, when run, will run the + * underlying runnable and which, as a {@code Future}, will yield + * the given value as its result and provide for cancellation of + * the underlying task + * @since 1.6 + */ + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value); + } + return super.newTaskFor(runnable, value); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first. + * + * @param callable the callable task being wrapped + * @return a {@code RunnableFuture} which, when run, will call the + * underlying callable and which, as a {@code Future}, will yield + * the callable's result as its result and provide for + * cancellation of the underlying task + * @since 1.6 + */ + @Override + protected RunnableFuture newTaskFor(Callable callable) { + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + callable = taskAwarePlugin.beforeTaskCreate(this, callable); + } + return super.newTaskFor(callable); + } + + /** + * Sets a new handler for unexecutable tasks. + * + * @param handler the new handler + * @throws NullPointerException if handler is null + * @see #getRejectedExecutionHandler + */ + @Override + public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) { + while (handler instanceof RejectedAwareHandlerWrapper) { + handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); + } + handlerWrapper.setHandler(handler); + } + + /** + * Returns the current handler for unexecutable tasks. + * + * @return the current handler + * @see #setRejectedExecutionHandler(RejectedExecutionHandler) + */ + @Override + public RejectedExecutionHandler getRejectedExecutionHandler() { + return handlerWrapper.getHandler(); + } + + /** + * Get thread-pool executor. + * + * @return thread-pool executor + */ + @Override + public ThreadPoolExecutor getThreadPoolExecutor() { + return this; + } + + /** + * Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor}, + * It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}. + * + * @see RejectedAwarePlugin + */ + @AllArgsConstructor + private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler { + + /** + * thread-pool action aware registry + */ + private final ThreadPoolPluginManager registry; + + /** + * original target + */ + @NonNull + @Setter + @Getter + private RejectedExecutionHandler handler; + + /** + * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task + * + * @param r the runnable task requested to be executed + * @param executor the executor attempting to execute this task + */ + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + Collection rejectedAwarePluginList = registry.getRejectedAwarePluginList(); + rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor)); + handler.rejectedExecution(r, executor); + } + + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java index 77237dd4..7b832473 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java @@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.ThreadPoolBuilder; -import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil; -import cn.hippo4j.message.service.Hippo4jSendMessageService; +import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.message.enums.NotifyTypeEnum; -import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.request.AlarmNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest; +import cn.hippo4j.message.service.Hippo4jSendMessageService; +import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner */ public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) { BlockingQueue blockingQueue = threadPoolExecutor.getQueue(); - RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor - ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler() - : threadPoolExecutor.getRejectedExecutionHandler(); + RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() : -1L; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java index ca9d7eaf..a0405a6e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java @@ -23,10 +23,8 @@ import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.ByteConvertUtil; import cn.hippo4j.common.toolkit.MemoryUtil; import cn.hippo4j.common.toolkit.StringUtil; -import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.toolkit.inet.InetUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -66,11 +64,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId); ThreadPoolExecutor pool = executorService.getExecutor(); String rejectedName; - if (pool instanceof AbstractDynamicExecutorSupport) { - rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName(); - } else { - rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); - } + rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); poolRunStateInfo.setRejectedName(rejectedName); ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class); manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java index cff8087c..8dcf65fa 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java @@ -17,6 +17,7 @@ package cn.hippo4j.core.executor.support; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -25,7 +26,10 @@ import java.util.concurrent.*; /** * Dynamic executor configuration support. + * + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} to get thread-pool shutdown support */ +@Deprecated @Slf4j public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean { diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java new file mode 100644 index 00000000..d6c8638a --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +/** + * Callback during task execution. + */ +public interface ExecuteAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before task execution. + * + * @param thread thread of executing task + * @param runnable task + * @see ExtensibleThreadPoolExecutor#beforeExecute + */ + default void beforeExecute(Thread thread, Runnable runnable) { + } + + /** + * Callback after task execution. + * + * @param runnable runnable + * @param throwable exception thrown during execution + * @see ExtensibleThreadPoolExecutor#afterExecute + */ + default void afterExecute(Runnable runnable, Throwable throwable) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java new file mode 100644 index 00000000..a4177eb4 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plug in runtime information. + */ +@RequiredArgsConstructor +@Getter +public class PluginRuntime { + + /** + * plugin id + */ + private final String pluginId; + + /** + * runtime info + */ + private final List infoList = new ArrayList<>(); + + /** + * Add a runtime info item. + * + * @param name name + * @param value value + * @return runtime info item + */ + public PluginRuntime addInfo(String name, Object value) { + infoList.add(new Info(name, value)); + return this; + } + + @Getter + @RequiredArgsConstructor + public static class Info { + + private final String name; + private final Object value; + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java new file mode 100644 index 00000000..3df9629b --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback when task is rejected. + */ +public interface RejectedAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java new file mode 100644 index 00000000..52396077 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback before thread-pool shutdown. + */ +public interface ShutdownAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before pool shutdown. + * + * @param executor executor + * @see ThreadPoolExecutor#shutdown() + * @see ThreadPoolExecutor#shutdownNow() + */ + default void beforeShutdown(ThreadPoolExecutor executor) { + // do nothing + } + + /** + * Callback after pool shutdown. + * + * @param executor executor + * @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called + * @see ThreadPoolExecutor#shutdown() + * @see ThreadPoolExecutor#shutdownNow() + */ + default void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + // do nothing + } + + /** + * Callback after pool terminated. + * + * @param executor executor + * @see ThreadPoolExecutor#terminated() + */ + default void afterTerminated(ExtensibleThreadPoolExecutor executor) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java new file mode 100644 index 00000000..33c56599 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback during task submit in thread-pool. + */ +public interface TaskAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool. + * + * @param executor executor + * @param runnable original task + * @return Tasks that really need to be performed + * @see ThreadPoolExecutor#newTaskFor(Runnable, Object) + */ + default Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { + return runnable; + } + + /** + * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool. + * + * @param executor executor + * @param future original task + * @return Tasks that really need to be performed + * @see ThreadPoolExecutor#newTaskFor(Callable) + */ + default Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { + return future; + } + + /** + * Callback when task is execute. + * + * @param runnable runnable + * @return tasks to be execute + * @see ExtensibleThreadPoolExecutor#execute + */ + default Runnable beforeTaskExecute(Runnable runnable) { + return runnable; + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java new file mode 100644 index 00000000..76238432 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; + +/** + *

A marker superinterface indicating that + * an instance class is eligible to be sense and intercept + * some operations of the specific thread-pool instance. + * + *

Generally, any thread-pool that implements the {@link ThreadPoolPluginSupport} + * can be register multiple plugins by {@link ThreadPoolPluginSupport#register}, + * and the plugin will provide some extension function of original + * {@link java.util.concurrent.ThreadPoolExecutor} does not support. + * + *

During runtime, plugins can dynamically modify some configurable parameters + * and provide some runtime information by {@link #getPluginRuntime()}. + * When the thread-pool is destroyed, the plugin will also be destroyed. + * + * @see ExtensibleThreadPoolExecutor + * @see ThreadPoolPluginManager + * @see TaskAwarePlugin + * @see ExecuteAwarePlugin + * @see ShutdownAwarePlugin + * @see RejectedAwarePlugin + */ +public interface ThreadPoolPlugin { + + /** + * Get id. + * + * @return id + */ + String getId(); + + /** + * Callback when plugin register into manager + * + * @see ThreadPoolPluginManager#register + */ + default void start() { + // do nothing + } + + /** + * Callback when plugin unregister from manager + * + * @see ThreadPoolPluginManager#unregister + * @see ThreadPoolPluginManager#clear + */ + default void stop() { + // do nothing + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + default PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java new file mode 100644 index 00000000..99212e18 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.TaskAwarePlugin; +import lombok.Getter; +import lombok.NonNull; +import org.springframework.core.task.TaskDecorator; + +import java.util.ArrayList; +import java.util.List; + +/** + * Decorate tasks when they are submitted to thread-pool. + */ +public class TaskDecoratorPlugin implements TaskAwarePlugin { + + public static final String PLUGIN_NAME = "task-decorator-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * decorators + */ + @Getter + private final List decorators = new ArrayList<>(); + + /** + * Callback when task is executed. + * + * @param runnable runnable + * @return tasks to be execute + * @see ExtensibleThreadPoolExecutor#execute + */ + @Override + public Runnable beforeTaskExecute(Runnable runnable) { + for (TaskDecorator decorator : decorators) { + runnable = decorator.decorate(runnable); + } + return runnable; + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("decorators", decorators); + } + + /** + * Add a decorator + * + * @param decorator decorator + */ + public void addDecorator(@NonNull TaskDecorator decorator) { + decorators.remove(decorator); + decorators.add(decorator); + } + + /** + * Clear all decorators + * + */ + public void clearDecorators() { + decorators.clear(); + } + + /** + * Remove decorators + * + */ + public void removeDecorator(TaskDecorator decorator) { + decorators.remove(decorator); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java new file mode 100644 index 00000000..0f478911 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Record the number of tasks rejected by the thread pool. + */ +public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin { + + public static final String PLUGIN_NAME = "task-reject-count-record-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * rejection count + */ + @Setter + @Getter + private AtomicLong rejectCount = new AtomicLong(0); + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("rejectCount", getRejectCountNum()); + } + + /** + * Record rejection count. + * + * @param r task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) { + rejectCount.incrementAndGet(); + } + + /** + * Get reject count num + * + * @return reject count num + */ + public Long getRejectCountNum() { + return rejectCount.get(); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java new file mode 100644 index 00000000..6b865642 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Send alert notification when a task is rejected. + */ +public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { + + public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + if (!(executor instanceof ExtensibleThreadPoolExecutor)) { + return; + } + String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId(); + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) + .ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId)); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java new file mode 100644 index 00000000..3ce5c5ad --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.toolkit.SystemClock; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Record task execution time indicator. + * + * @see TaskTimeoutNotifyAlarmPlugin + */ +@RequiredArgsConstructor +public class TaskTimeRecordPlugin implements ExecuteAwarePlugin { + + public static final String PLUGIN_NAME = "task-time-record-plugin"; + + /** + * Lock instance. + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Total execution milli time of all tasks. + */ + private long totalTaskTimeMillis = 0L; + + /** + * Maximum task milli execution time, default -1. + */ + private long maxTaskTimeMillis = -1L; + + /** + * Minimal task milli execution time, default -1. + */ + private long minTaskTimeMillis = -1L; + + /** + * Count of completed task. + */ + private long taskCount = 0L; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * start times of executed tasks + */ + private final ThreadLocal startTimes = new ThreadLocal<>(); + + /** + * Record the time when the worker thread starts executing the task. + * + * @param thread thread of executing task + * @param runnable task + * @see ExtensibleThreadPoolExecutor#beforeExecute + */ + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + startTimes.set(SystemClock.now()); + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + Summary summary = summarize(); + return new PluginRuntime(getId()) + .addInfo("taskCount", summary.getTaskCount()) + .addInfo("minTaskTime", summary.getMinTaskTimeMillis() + "ms") + .addInfo("maxTaskTime", summary.getMaxTaskTimeMillis() + "ms") + .addInfo("totalTaskTime", summary.getTotalTaskTimeMillis() + "ms") + .addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms"); + } + + /** + * Record the total time for the worker thread to complete the task, and update the time record. + * + * @param runnable runnable + * @param throwable exception thrown during execution + */ + @Override + public void afterExecute(Runnable runnable, Throwable throwable) { + try { + Long startTime = startTimes.get(); + if (Objects.isNull(startTime)) { + return; + } + long executeTime = SystemClock.now() - startTime; + recordTaskTime(executeTime); + } finally { + startTimes.remove(); + } + } + + /** + * Refresh time indicators of the current instance. + * + * @param taskExecutionTime millisecond + */ + protected void recordTaskTime(long taskExecutionTime) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + if (taskCount == 0) { + maxTaskTimeMillis = taskExecutionTime; + minTaskTimeMillis = taskExecutionTime; + } else { + maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis); + minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis); + } + taskCount = taskCount + 1; + totalTaskTimeMillis += taskExecutionTime; + } finally { + writeLock.unlock(); + } + } + + /** + * Get the summary statistics of the instance at the current time. + * + * @return data snapshot + */ + public Summary summarize() { + Lock readLock = lock.readLock(); + Summary statistics; + readLock.lock(); + try { + statistics = new Summary( + this.totalTaskTimeMillis, + this.maxTaskTimeMillis, + this.minTaskTimeMillis, + this.taskCount); + } finally { + readLock.unlock(); + } + return statistics; + } + + /** + * Summary statistics of SyncTimeRecorder instance at a certain time. + */ + @Getter + @RequiredArgsConstructor + public static class Summary { + + /** + * Total execution nano time of all tasks. + */ + private final long totalTaskTimeMillis; + + /** + * Maximum task nano execution time. + */ + private final long maxTaskTimeMillis; + + /** + * Minimal task nano execution time. + */ + private final long minTaskTimeMillis; + + /** + * Count of completed task. + */ + private final long taskCount; + + /** + * Get the avg task time in milliseconds. + * + * @return avg task time + */ + public long getAvgTaskTimeMillis() { + long totalTaskCount = getTaskCount(); + return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1; + } + + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java new file mode 100644 index 00000000..12e522db --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Record task execution time indicator, + * and send alarm notification when the execution time exceeds the threshold. + */ +@AllArgsConstructor +public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin { + + public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin"; + + /** + * threadPoolId + */ + private final String threadPoolId; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + @Getter + @Setter + private Long executeTimeOut; + + /** + * thread-pool + */ + private final ThreadPoolExecutor threadPoolExecutor; + + /** + * Check whether the task execution time exceeds {@link #executeTimeOut}, + * if it exceeds this time, send an alarm notification. + * + * @param executeTime executeTime in nanosecond + */ + @Override + protected void recordTaskTime(long executeTime) { + super.recordTaskTime(executeTime); + if (executeTime <= executeTimeOut) { + return; + } + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) + .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm( + threadPoolId, executeTime, executeTimeOut, threadPoolExecutor)); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java new file mode 100644 index 00000000..114f2971 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.ShutdownAwarePlugin; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.*; + +/** + *

After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()}.
+ * Cancel the remaining tasks in the pool, then wait for the thread pool to terminate until + * the blocked main thread has timed out or the thread pool has completely terminated. + */ +@Accessors(chain = true) +@Getter +@Slf4j +@AllArgsConstructor +public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin { + + public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * await termination millis + */ + @Setter + public long awaitTerminationMillis; + + /** + * Callback before pool shutdown. + * + * @param executor executor + */ + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + if (executor instanceof ExtensibleThreadPoolExecutor) { + ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor; + String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId(); + if (log.isInfoEnabled()) { + log.info("Before shutting down ExecutorService {}", threadPoolId); + } + } + } + + /** + * Callback after pool shutdown.
+ * cancel the remaining tasks, + * then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary. + * + * @param executor executor + * @param remainingTasks remainingTasks + */ + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + if (executor instanceof ExtensibleThreadPoolExecutor) { + ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor; + if (CollectionUtil.isNotEmpty(remainingTasks)) { + remainingTasks.forEach(this::cancelRemainingTask); + } + awaitTerminationIfNecessary(pool); + } + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("awaitTerminationMillis", awaitTerminationMillis); + } + + /** + * Cancel the given remaining task which never commended execution, + * as returned from {@link ExecutorService#shutdownNow()}. + * + * @param task the task to cancel (typically a {@link RunnableFuture}) + * @see RunnableFuture#cancel(boolean) + * @since 5.0.5 + */ + protected void cancelRemainingTask(Runnable task) { + if (task instanceof Future) { + ((Future) task).cancel(true); + } + } + + /** + * Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}. + */ + private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) { + String threadPoolId = executor.getThreadPoolId(); + if (this.awaitTerminationMillis <= 0) { + return; + } + try { + boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS); + if (!isTerminated && log.isWarnEnabled()) { + log.warn("Timed out while waiting for executor {} to terminate.", threadPoolId); + } else { + log.info("ExecutorService {} has been shutdowned.", threadPoolId); + } + } catch (InterruptedException ex) { + if (log.isWarnEnabled()) { + log.warn("Interrupted while waiting for executor {} to terminate.", threadPoolId); + } + Thread.currentThread().interrupt(); + } + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java new file mode 100644 index 00000000..50b220d5 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.core.plugin.*; +import lombok.NonNull; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + *

The default implementation of {@link ThreadPoolPluginManager}. + * Provide basic {@link ThreadPoolPlugin} registration, logout and acquisition functions. + * Most APIs ensure limited thread-safe. + * + *

Usually registered to {@link cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor}, + * or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport} + * to support its plugin based extension functions. + * + *

NOTE: + * When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable. + * This means that until actually start iterating over the list, + * registering or unregistering plugins through the manager will affect the results of the iteration. + * Therefore, we should try to ensure that get the latest plugin list from the manager before each use. + * + * @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor + * @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor + */ +public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager { + + /** + * lock of this instance + */ + private final ReadWriteLock instanceLock = new ReentrantReadWriteLock(); + + /** + * Registered {@link ThreadPoolPlugin}. + */ + private final Map registeredPlugins = new ConcurrentHashMap<>(16); + + /** + * Registered {@link TaskAwarePlugin}. + */ + private final List taskAwarePluginList = new CopyOnWriteArrayList<>(); + + /** + * Registered {@link ExecuteAwarePlugin}. + */ + private final List executeAwarePluginList = new CopyOnWriteArrayList<>(); + + /** + * Registered {@link RejectedAwarePlugin}. + */ + private final List rejectedAwarePluginList = new CopyOnWriteArrayList<>(); + + /** + * Registered {@link ShutdownAwarePlugin}. + */ + private final List shutdownAwarePluginList = new CopyOnWriteArrayList<>(); + + /** + * Clear all. + */ + @Override + public synchronized void clear() { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + Collection plugins = registeredPlugins.values(); + registeredPlugins.clear(); + taskAwarePluginList.clear(); + executeAwarePluginList.clear(); + rejectedAwarePluginList.clear(); + shutdownAwarePluginList.clear(); + plugins.forEach(ThreadPoolPlugin::stop); + } finally { + writeLock.unlock(); + } + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + @Override + public void register(@NonNull ThreadPoolPlugin plugin) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + String id = plugin.getId(); + Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered"); + + // register plugin + registeredPlugins.put(id, plugin); + // quick index + if (plugin instanceof TaskAwarePlugin) { + taskAwarePluginList.add((TaskAwarePlugin) plugin); + } + if (plugin instanceof ExecuteAwarePlugin) { + executeAwarePluginList.add((ExecuteAwarePlugin) plugin); + } + if (plugin instanceof RejectedAwarePlugin) { + rejectedAwarePluginList.add((RejectedAwarePlugin) plugin); + } + if (plugin instanceof ShutdownAwarePlugin) { + shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin); + } + plugin.start(); + } finally { + writeLock.unlock(); + } + } + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + @Override + public boolean tryRegister(ThreadPoolPlugin plugin) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + if (registeredPlugins.containsKey(plugin.getId())) { + return false; + } + register(plugin); + return true; + } finally { + writeLock.unlock(); + } + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + @Override + public void unregister(String pluginId) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + Optional.ofNullable(pluginId) + .map(registeredPlugins::remove) + .ifPresent(plugin -> { + // remove quick index if necessary + if (plugin instanceof TaskAwarePlugin) { + taskAwarePluginList.remove(plugin); + } + if (plugin instanceof ExecuteAwarePlugin) { + executeAwarePluginList.remove(plugin); + } + if (plugin instanceof RejectedAwarePlugin) { + rejectedAwarePluginList.remove(plugin); + } + if (plugin instanceof ShutdownAwarePlugin) { + shutdownAwarePluginList.remove(plugin); + } + plugin.stop(); + }); + } finally { + writeLock.unlock(); + } + } + + /** + * Get all registered plugins. + * + * @return plugins + * @apiNote Be sure to avoid directly modifying returned collection instances, + * otherwise, unexpected results may be obtained through the manager + */ + @Override + public Collection getAllPlugins() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return registeredPlugins.values(); + } finally { + readLock.unlock(); + } + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + @Override + public boolean isRegistered(String pluginId) { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return registeredPlugins.containsKey(pluginId); + } finally { + readLock.unlock(); + } + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @param plugin type + * @return {@link ThreadPoolPlugin}, null if unregister + */ + @Override + @SuppressWarnings("unchecked") + public Optional getPlugin(String pluginId) { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return (Optional) Optional.ofNullable(registeredPlugins.get(pluginId)); + } finally { + readLock.unlock(); + } + } + + /** + * Get execute plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + public Collection getExecuteAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return executeAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get rejected plugin list. + * + * @return {@link RejectedAwarePlugin} + * @apiNote Be sure to avoid directly modifying returned collection instances, + * otherwise, unexpected results may be obtained through the manager + */ + @Override + public Collection getRejectedAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return rejectedAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get shutdown plugin list. + * + * @return {@link ShutdownAwarePlugin} + * @apiNote Be sure to avoid directly modifying returned collection instances, + * otherwise, unexpected results may be obtained through the manager + */ + @Override + public Collection getShutdownAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return shutdownAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get shutdown plugin list. + * + * @return {@link ShutdownAwarePlugin} + * @apiNote Be sure to avoid directly modifying returned collection instances, + * otherwise, unexpected results may be obtained through the manager + */ + @Override + public Collection getTaskAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return taskAwarePluginList; + } finally { + readLock.unlock(); + } + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java new file mode 100644 index 00000000..211c33a9 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.ThreadPoolPlugin; +import cn.hippo4j.core.plugin.impl.*; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +/** + * Register default {@link ThreadPoolPlugin}. + * + * @see TaskDecoratorPlugin + * @see TaskTimeoutNotifyAlarmPlugin + * @see TaskRejectCountRecordPlugin + * @see TaskRejectNotifyAlarmPlugin + * @see ThreadPoolExecutorShutdownPlugin + */ +@NoArgsConstructor +@AllArgsConstructor +public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar { + + public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar"; + + /** + * execute time out + */ + private long executeTimeOut; + + /** + * await termination millis + */ + private long awaitTerminationMillis; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return REGISTRAR_NAME; + } + + /** + * Create and register plugin for the specified thread-pool instance. + * + * @param support thread pool plugin manager delegate + */ + @Override + public void doRegister(ThreadPoolPluginSupport support) { + // callback when task execute + support.register(new TaskDecoratorPlugin()); + support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor())); + // callback when task rejected + support.register(new TaskRejectCountRecordPlugin()); + support.register(new TaskRejectNotifyAlarmPlugin()); + // callback when pool shutdown + support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis)); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java new file mode 100644 index 00000000..dff975b4 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.*; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; + +/** + * Empty thread pool plugin manager. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager { + + /** + * default instance + */ + public static final EmptyThreadPoolPluginManager INSTANCE = new EmptyThreadPoolPluginManager(); + + /** + * Clear all. + */ + @Override + public void clear() { + // do nothing + } + + /** + * Get all registered plugins. + * + * @return plugins + */ + @Override + public Collection getAllPlugins() { + return Collections.emptyList(); + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} + * already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + @Override + public void register(ThreadPoolPlugin plugin) { + // do nothing + } + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + @Override + public boolean tryRegister(ThreadPoolPlugin plugin) { + return false; + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + @Override + public boolean isRegistered(String pluginId) { + return false; + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + @Override + public void unregister(String pluginId) { + // do nothing + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @return {@link ThreadPoolPlugin} + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + @Override + public Optional getPlugin(String pluginId) { + return Optional.empty(); + } + + /** + * Get execute aware plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + public Collection getExecuteAwarePluginList() { + return Collections.emptyList(); + } + + /** + * Get rejected aware plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + public Collection getRejectedAwarePluginList() { + return Collections.emptyList(); + } + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getShutdownAwarePluginList() { + return Collections.emptyList(); + } + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getTaskAwarePluginList() { + return Collections.emptyList(); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java new file mode 100644 index 00000000..dd2a7777 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.*; + +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Manager of {@link ThreadPoolPlugin}. + * Bind with the specified thread-pool instance to register and manage plugins. + * when the thread pool is destroyed, please ensure that the manager will also be destroyed. + * + * @see DefaultThreadPoolPluginManager + */ +public interface ThreadPoolPluginManager { + + /** + * Get an empty manager. + * + * @return {@link EmptyThreadPoolPluginManager} + */ + static ThreadPoolPluginManager empty() { + return EmptyThreadPoolPluginManager.INSTANCE; + } + + /** + * Clear all. + */ + void clear(); + + /** + * Get all registered plugins. + * + * @return plugins + */ + Collection getAllPlugins(); + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} + * already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + void register(ThreadPoolPlugin plugin); + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + boolean tryRegister(ThreadPoolPlugin plugin); + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + boolean isRegistered(String pluginId); + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + void unregister(String pluginId); + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @param target aware type + * @return {@link ThreadPoolPlugin} + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + Optional getPlugin(String pluginId); + + /** + * Get execute aware plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + Collection getExecuteAwarePluginList(); + + /** + * Get rejected aware plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + Collection getRejectedAwarePluginList(); + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + Collection getShutdownAwarePluginList(); + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + Collection getTaskAwarePluginList(); + + // ==================== default methods ==================== + + /** + * Get plugin of type. + * + * @param pluginId plugin id + * @param pluginType plugin type + * @return target plugin + */ + default Optional getPluginOfType(String pluginId, Class pluginType) { + return getPlugin(pluginId) + .filter(pluginType::isInstance) + .map(pluginType::cast); + } + + /** + * Get all plugins of type. + * + * @param pluginType plugin type + * @return all plugins of type + */ + default Collection getAllPluginsOfType(Class pluginType) { + return getAllPlugins().stream() + .filter(pluginType::isInstance) + .map(pluginType::cast) + .collect(Collectors.toList()); + } + + /** + * Get {@link PluginRuntime} of all registered plugins. + * + * @return {@link PluginRuntime} of all registered plugins + */ + default Collection getAllPluginRuntimes() { + return getAllPlugins().stream() + .map(ThreadPoolPlugin::getPluginRuntime) + .collect(Collectors.toList()); + } + + /** + * Get {@link PluginRuntime} of registered plugin. + * + * @return {@link PluginRuntime} of registered plugin + */ + default Optional getRuntime(String pluginId) { + return getPlugin(pluginId) + .map(ThreadPoolPlugin::getPluginRuntime); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java new file mode 100644 index 00000000..23576c36 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.ThreadPoolPlugin; + +/** + * Registrar of {@link ThreadPoolPlugin}. + */ +public interface ThreadPoolPluginRegistrar { + + /** + * Get id. + * In spring container, the obtained id will be used as the alias of the bean name. + * + * @return id + */ + String getId(); + + /** + * Create and register plugin for the specified thread-pool instance. + * + * @param support thread pool plugin manager delegate + */ + void doRegister(ThreadPoolPluginSupport support); + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java new file mode 100644 index 00000000..c3bd4187 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.*; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Used to support the binding of {@link ThreadPoolPluginManager} and {@link ThreadPoolExecutor}. + */ +public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager { + + /** + * Get thread pool action aware registry. + * + * @return {@link ThreadPoolPluginManager} + */ + @NonNull + ThreadPoolPluginManager getThreadPoolPluginManager(); + + /** + * Get thread-pool id + * + * @return thread-pool id + */ + String getThreadPoolId(); + + /** + * Get thread-pool executor. + * + * @return thread-pool executor + */ + ThreadPoolExecutor getThreadPoolExecutor(); + + // ======================== delegate methods ======================== + + /** + * Clear all. + */ + @Override + default void clear() { + getThreadPoolPluginManager().clear(); + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin aware + */ + @Override + default void register(ThreadPoolPlugin plugin) { + getThreadPoolPluginManager().register(plugin); + } + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + @Override + default boolean tryRegister(ThreadPoolPlugin plugin) { + return getThreadPoolPluginManager().tryRegister(plugin); + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId name + * @return ture if target has been registered, false otherwise + */ + @Override + default boolean isRegistered(String pluginId) { + return getThreadPoolPluginManager().isRegistered(pluginId); + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId name + */ + @Override + default void unregister(String pluginId) { + getThreadPoolPluginManager().unregister(pluginId); + } + + /** + * Get all registered plugins. + * + * @return plugins + */ + @Override + default Collection getAllPlugins() { + return getThreadPoolPluginManager().getAllPlugins(); + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId target name + * @return {@link ThreadPoolPlugin}, null if unregister + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + @Override + default Optional getPlugin(String pluginId) { + return getThreadPoolPluginManager().getPlugin(pluginId); + } + + /** + * Get execute aware list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + default Collection getExecuteAwarePluginList() { + return getThreadPoolPluginManager().getExecuteAwarePluginList(); + } + + /** + * Get rejected aware list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + default Collection getRejectedAwarePluginList() { + return getThreadPoolPluginManager().getRejectedAwarePluginList(); + } + + /** + * Get shutdown aware list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + default Collection getShutdownAwarePluginList() { + return getThreadPoolPluginManager().getShutdownAwarePluginList(); + } + + /** + * Get shutdown aware list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + default Collection getTaskAwarePluginList() { + return getThreadPoolPluginManager().getTaskAwarePluginList(); + } + +} diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index 52a13cd7..8f1e4a3b 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; @@ -43,7 +41,6 @@ import java.util.*; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; @@ -190,6 +187,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener