diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/SyncTimeRecorder.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/SyncTimeRecorder.java new file mode 100644 index 00000000..35891fa1 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/SyncTimeRecorder.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.common.toolkit; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Thread safe time recorder, + * used to record multiple time periods and count various time indicators. + * + * @author huangchengxing + */ +public class SyncTimeRecorder { + + /** + * Lock instance. + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Total execution milli time of all tasks. + */ + private long totalTaskTime = 0L; + + /** + * Maximum task milli execution time, default -1. + */ + private long maxTaskTime = -1L; + + /** + * Minimal task milli execution time, default -1. + */ + private long minTaskTime = -1L; + + /** + * Count of completed task. + */ + private long taskCount = 0L; + + /** + * Get the summary statistics of the instance at the current time. + * + * @return data snapshot + */ + public Summary summarize() { + Lock readLock = lock.readLock(); + Summary statistics; + readLock.lock(); + try { + statistics = new Summary( + this.totalTaskTime, + this.maxTaskTime, + this.minTaskTime, + this.taskCount); + } finally { + readLock.unlock(); + } + return statistics; + } + + /** + * Refresh time indicators of the current instance. + * + * @param taskExecutionTime millisecond + */ + public final void refreshTime(long taskExecutionTime) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + if (taskCount == 0) { + maxTaskTime = taskExecutionTime; + minTaskTime = taskExecutionTime; + } else { + maxTaskTime = Math.max(taskExecutionTime, maxTaskTime); + minTaskTime = Math.min(taskExecutionTime, minTaskTime); + } + taskCount = taskCount + 1; + totalTaskTime += taskExecutionTime; + } finally { + writeLock.unlock(); + } + } + + /** + * Summary statistics of SyncTimeRecorder instance at a certain time. + */ + @Getter + @RequiredArgsConstructor + public static class Summary { + + /** + * Total execution nano time of all tasks. + */ + private final long totalTaskTime; + + /** + * Maximum task nano execution time. + */ + private final long maxTaskTime; + + /** + * Minimal task nano execution time. + */ + private final long minTaskTime; + + /** + * Count of completed task. + */ + private final long taskCount; + + /** + * Get the avg task time in milliseconds. + * + * @return avg task time + */ + public long getAvgTaskTimeMillis() { + return getTotalTaskTime() / getTaskCount(); + } + + } + +} diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml index bd891a49..e9e05f10 100644 --- a/hippo4j-core/pom.xml +++ b/hippo4j-core/pom.xml @@ -10,6 +10,11 @@ hippo4j-core + + org.springframework.boot + spring-boot-starter-test + test + org.projectlombok lombok diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 2249fabc..f9530c16 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -17,107 +17,180 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.core.proxy.RejectedProxyUtil; -import cn.hippo4j.core.toolkit.SystemClock; -import lombok.Getter; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistrar; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin; +import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; import lombok.NonNull; -import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; import org.springframework.core.task.TaskDecorator; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * Enhanced dynamic and monitored thread pool. */ -public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport { - - @Getter - @Setter - private Long executeTimeOut; - - @Getter - @Setter - private TaskDecorator taskDecorator; - - @Getter - @Setter - private RejectedExecutionHandler redundancyHandler; - - @Getter - private final String threadPoolId; - - @Getter - private final AtomicLong rejectCount = new AtomicLong(); - - private final ThreadLocal startTimeThreadLocal = new ThreadLocal<>(); - - public DynamicThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - long executeTimeOut, - boolean waitForTasksToCompleteOnShutdown, - long awaitTerminationMillis, - @NonNull BlockingQueue blockingQueue, - @NonNull String threadPoolId, - @NonNull ThreadFactory threadFactory, - @NonNull RejectedExecutionHandler rejectedExecutionHandler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler); - this.threadPoolId = threadPoolId; - this.executeTimeOut = executeTimeOut; - // Number of dynamic proxy denial policies. - RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); - setRejectedExecutionHandler(rejectedProxy); - // Redundant fields to avoid reflecting the acquired fields when sending change information. - redundancyHandler = rejectedExecutionHandler; +@Slf4j +public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { + + /** + * Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters. + * + * @param threadPoolId thread-pool id + * @param executeTimeOut execute time out + * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown + * @param awaitTerminationMillis await termination millis + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param blockingQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor creates a new thread + * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public DynamicThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis, + @NonNull BlockingQueue blockingQueue, + @NonNull String threadPoolId, + @NonNull ThreadFactory threadFactory, + @NonNull RejectedExecutionHandler rejectedExecutionHandler) { + super( + threadPoolId, new DefaultThreadPoolPluginRegistry(), + corePoolSize, maximumPoolSize, keepAliveTime, unit, + blockingQueue, threadFactory, rejectedExecutionHandler + ); + log.info("Initializing ExecutorService" + threadPoolId); + + // init default aware processor + new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown) + .doRegister(this, this); } + /** + * Invoked by the containing {@code BeanFactory} on destruction of a bean. + * + * @throws Exception in case of shutdown errors. Exceptions will get logged + * but not rethrown to allow other beans to release their resources as well. + */ @Override - public void execute(@NonNull Runnable command) { - if (taskDecorator != null) { - command = taskDecorator.decorate(command); - } - super.execute(command); + public void destroy() throws Exception { + getAndThen( + ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, + ThreadPoolExecutorShutdownPlugin.class, + processor -> { + if (processor.isWaitForTasksToCompleteOnShutdown()) { + super.shutdown(); + } else { + super.shutdownNow(); + } + }); } - @Override - protected void beforeExecute(Thread t, Runnable r) { - if (executeTimeOut == null || executeTimeOut <= 0) { - return; - } - startTimeThreadLocal.set(SystemClock.now()); + public long getAwaitTerminationMillis() { + return getAndThen( + ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, + ThreadPoolExecutorShutdownPlugin.class, + ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis, -1L + ); } - @Override - protected void afterExecute(Runnable r, Throwable t) { - Long startTime; - if ((startTime = startTimeThreadLocal.get()) == null) { - return; - } - try { - long endTime = SystemClock.now(); - long executeTime; - boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut; - if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) { - ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class); - if (notifyAlarmHandler != null) { - notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this); + public boolean isWaitForTasksToCompleteOnShutdown() { + return getAndThen( + ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, + ThreadPoolExecutorShutdownPlugin.class, + ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown, false + ); + } + + /** + * Set support param. + * + * @param awaitTerminationMillis await termination millis + * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown + */ + public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) { + getAndThen( + ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, + ThreadPoolExecutorShutdownPlugin.class, + processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis) + .setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown) + ); + } + + public Long getRejectCountNum() { + return getAndThen( + TaskRejectCountRecordPlugin.PLUGIN_NAME, + TaskRejectCountRecordPlugin.class, + TaskRejectCountRecordPlugin::getRejectCountNum, -1L + ); + } + + public Long getExecuteTimeOut() { + return getAndThen( + TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, + TaskTimeoutNotifyAlarmPlugin.class, + TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut, -1L + ); + } + + public void setExecuteTimeOut(Long executeTimeOut) { + getAndThen( + TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, + TaskTimeoutNotifyAlarmPlugin.class, + processor -> processor.setExecuteTimeOut(executeTimeOut) + ); + } + + public TaskDecorator getTaskDecorator() { + return getAndThen( + TaskDecoratorPlugin.PLUGIN_NAME, + TaskDecoratorPlugin.class, + processor -> CollectionUtil.getFirst(processor.getDecorators()), null + ); + } + + public void setTaskDecorator(TaskDecorator taskDecorator) { + if (Objects.nonNull(taskDecorator)) { + getAndThen( + TaskDecoratorPlugin.PLUGIN_NAME, + TaskDecoratorPlugin.class, + processor -> { + processor.clearDecorators(); + processor.addDecorator(taskDecorator); } - } - } finally { - startTimeThreadLocal.remove(); + ); } } - @Override - protected ExecutorService initializeExecutor() { - return this; + public RejectedExecutionHandler getRedundancyHandler() { + return getRejectedExecutionHandler(); } - public Long getRejectCountNum() { - return rejectCount.get(); + public void getRedundancyHandler(RejectedExecutionHandler handler) { + setRejectedExecutionHandler(handler); } + } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java index a8827549..2e2abc4e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java @@ -17,7 +17,6 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; import lombok.AllArgsConstructor; import lombok.Builder; @@ -67,8 +66,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean { @Override public void destroy() throws Exception { - if (executor != null && executor instanceof AbstractDynamicExecutorSupport) { - ((AbstractDynamicExecutorSupport) executor).destroy(); + if (executor != null && executor instanceof DynamicThreadPoolExecutor) { + ((DynamicThreadPoolExecutor) executor).destroy(); } } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java new file mode 100644 index 00000000..b1b130ef --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.core.plugin.*; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +/** + *

Extensible thread-pool executor.
+ * Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}. + * Each extension point corresponds to a different {@link ThreadPoolPlugin} interface, + * users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface + * to enable plugins to sense thread pool behavior and provide extended functions. + * + * @author huangchengxing + */ +public class ExtensibleThreadPoolExecutor + extends ThreadPoolExecutor implements ThreadPoolPluginRegistryDelegate { + + /** + * thread pool id + */ + @Getter + @NonNull + private final String threadPoolId; + + /** + * action aware registry + */ + @Getter + private final ThreadPoolPluginRegistry threadPoolPluginRegistry; + + /** + * handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper + */ + private final RejectedAwareHandlerWrapper handlerWrapper; + + /** + * Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters. + * + * @param threadPoolId thread-pool id + * @param threadPoolPluginRegistry action aware registry + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor + * creates a new thread + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public ExtensibleThreadPoolExecutor( + @NonNull String threadPoolId, + @NonNull ThreadPoolPluginRegistry threadPoolPluginRegistry, + int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + @NonNull BlockingQueue workQueue, + @NonNull ThreadFactory threadFactory, + @NonNull RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + + // pool extended info + this.threadPoolId = threadPoolId; + this.threadPoolPluginRegistry = threadPoolPluginRegistry; + + // proxy handler to support Aware callback + while (handler instanceof RejectedAwareHandlerWrapper) { + handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); + } + this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginRegistry, handler); + super.setRejectedExecutionHandler(handlerWrapper); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first. + * + * @param thread the thread that will run task {@code r} + * @param runnable the task that will be executed + */ + @Override + protected void beforeExecute(Thread thread, Runnable runnable) { + Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList(); + executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable)); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, {@link ExecuteAwarePlugin#execute} will be called first. + * + * @param runnable the task to execute + */ + @Override + public void execute(@NonNull Runnable runnable) { + Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList(); + for (ExecuteAwarePlugin executeAwarePlugin : executeAwarePluginList) { + runnable = executeAwarePlugin.execute(runnable); + } + super.execute(runnable); + } + + /** + * {@inheritDoc} + * + *

After calling the parent class method, {@link ExecuteAwarePlugin#afterExecute} will be called last. + * + * @param runnable the runnable that has completed + * @param throwable the exception that caused termination, or null if + * execution completed normally + */ + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList(); + executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable)); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, + * {@link ShutdownAwarePlugin#beforeShutdown} will be called first. + * and then will be call {@link ShutdownAwarePlugin#afterShutdown} + * + * @throws SecurityException {@inheritDoc} + */ + @Override + public void shutdown() { + Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList(); + shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); + super.shutdown(); + shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList())); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, + * {@link ShutdownAwarePlugin#beforeShutdown} will be called first. + * and then will be call {@link ShutdownAwarePlugin#afterShutdown} + * + * @throws SecurityException + */ + @Override + public List shutdownNow() { + Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList(); + shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); + List tasks = super.shutdownNow(); + shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks)); + return tasks; + } + + /** + * {@inheritDoc}. + * + *

Before calling the parent class method, {@link ShutdownAwarePlugin#afterTerminated} will be called first. + */ + @Override + protected void terminated() { + super.terminated(); + Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList(); + shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this)); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first. + * + * @param runnable the runnable task being wrapped + * @param value the default value for the returned future + * @return a {@code RunnableFuture} which, when run, will run the + * underlying runnable and which, as a {@code Future}, will yield + * the given value as its result and provide for cancellation of + * the underlying task + * @since 1.6 + */ + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + Collection taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value); + } + return super.newTaskFor(runnable, value); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first. + * + * @param callable the callable task being wrapped + * @return a {@code RunnableFuture} which, when run, will call the + * underlying callable and which, as a {@code Future}, will yield + * the callable's result as its result and provide for + * cancellation of the underlying task + * @since 1.6 + */ + @Override + protected RunnableFuture newTaskFor(Callable callable) { + Collection taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + callable = taskAwarePlugin.beforeTaskCreate(this, callable); + } + return super.newTaskFor(callable); + } + + /** + * Sets a new handler for unexecutable tasks. + * + * @param handler the new handler + * @throws NullPointerException if handler is null + * @see #getRejectedExecutionHandler + */ + @Override + public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) { + while (handler instanceof RejectedAwareHandlerWrapper) { + handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); + } + handlerWrapper.setHandler(handler); + } + + /** + * Returns the current handler for unexecutable tasks. + * + * @return the current handler + * @see #setRejectedExecutionHandler(RejectedExecutionHandler) + */ + @Override + public RejectedExecutionHandler getRejectedExecutionHandler() { + return handlerWrapper.getHandler(); + } + + /** + * Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor}, + * It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}. + * + * @author huangchengxing + * @see RejectedAwarePlugin + */ + @AllArgsConstructor + private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler { + + /** + * thread-pool action aware registry + */ + private final ThreadPoolPluginRegistry registry; + + /** + * original target + */ + @NonNull + @Setter + @Getter + private RejectedExecutionHandler handler; + + /** + * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task + * + * @param r the runnable task requested to be executed + * @param executor the executor attempting to execute this task + */ + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + Collection rejectedAwarePluginList = registry.getRejectedAwareList(); + rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor)); + handler.rejectedExecution(r, executor); + } + + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java index 77237dd4..7b832473 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java @@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.ThreadPoolBuilder; -import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil; -import cn.hippo4j.message.service.Hippo4jSendMessageService; +import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.message.enums.NotifyTypeEnum; -import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.request.AlarmNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest; +import cn.hippo4j.message.service.Hippo4jSendMessageService; +import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner */ public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) { BlockingQueue blockingQueue = threadPoolExecutor.getQueue(); - RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor - ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler() - : threadPoolExecutor.getRejectedExecutionHandler(); + RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() : -1L; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java index 62b9b1ab..81f17859 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java @@ -21,10 +21,8 @@ import cn.hippo4j.common.model.ManyThreadPoolRunStateInfo; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.ByteConvertUtil; -import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.toolkit.inet.InetUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -70,11 +68,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId); ThreadPoolExecutor pool = executorService.getExecutor(); String rejectedName; - if (pool instanceof AbstractDynamicExecutorSupport) { - rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName(); - } else { - rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); - } + rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); poolRunStateInfo.setRejectedName(rejectedName); ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class); manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java index cff8087c..d9aae7d5 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java @@ -17,6 +17,7 @@ package cn.hippo4j.core.executor.support; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -25,7 +26,10 @@ import java.util.concurrent.*; /** * Dynamic executor configuration support. + * + * @deprecated used {@link ThreadPoolExecutorShutdownPlugin} to get thread pool shutdown support */ +@Deprecated @Slf4j public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean { diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java new file mode 100644 index 00000000..786cb6d8 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java @@ -0,0 +1,105 @@ +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.impl.*; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactoryUtils; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.AliasRegistry; + +import java.util.Objects; + +/** + * Register default {@link ThreadPoolPlugin}. + * + * @author huangchengxing + * @see TaskDecoratorPlugin + * @see TaskTimeoutNotifyAlarmPlugin + * @see TaskRejectCountRecordPlugin + * @see TaskRejectNotifyAlarmPlugin + * @see ThreadPoolExecutorShutdownPlugin + */ +@RequiredArgsConstructor +public class DefaultThreadPoolPluginRegistrar + implements ThreadPoolPluginRegistrar, ApplicationContextAware, BeanNameAware { + + public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar"; + + /** + * alias registry + */ + private AliasRegistry aliasRegistry; + + /** + * execute time out + */ + private final long executeTimeOut; + + /** + * await termination millis + */ + private final long awaitTerminationMillis; + + /** + * wait for tasks to complete on shutdown + */ + private final boolean waitForTasksToCompleteOnShutdown; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return REGISTRAR_NAME; + } + + /** + * Create and register plugin for the specified thread-pool instance + * + * @param registry thread pool plugin registry + * @param executor executor + */ + @Override + public void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor) { + // callback when task execute + registry.register(new TaskDecoratorPlugin()); + registry.register(new TaskTimeoutNotifyAlarmPlugin(executeTimeOut, executor)); + // callback when task rejected + registry.register(new TaskRejectCountRecordPlugin()); + registry.register(new TaskRejectNotifyAlarmPlugin()); + // callback when pool shutdown + registry.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis, waitForTasksToCompleteOnShutdown)); + } + + /** + * Set the name of the bean in the bean factory that created this bean. + *

Invoked after population of normal bean properties but before an + * init callback such as {@link InitializingBean#afterPropertiesSet()} + * or a custom init-method. + * + * @param name the name of the bean in the factory. + * Note that this name is the actual bean name used in the factory, which may + * differ from the originally specified name: in particular for inner bean + * names, the actual bean name might have been made unique through appending + * "#..." suffixes. Use the {@link BeanFactoryUtils#originalBeanName(String)} + * method to extract the original bean name (without suffix), if desired. + */ + @Override + public void setBeanName(String name) { + if (Objects.nonNull(aliasRegistry)) { + aliasRegistry.registerAlias(name, getId()); + } + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.aliasRegistry = applicationContext.getBean(AliasRegistry.class); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java new file mode 100644 index 00000000..71fc74c2 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.common.toolkit.Assert; +import lombok.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * The default implementation of {@link ThreadPoolPluginRegistry}. + * + * @author huangchengxing + */ +public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry { + + /** + * lock of this instance + */ + private final ReadWriteLock instanceLock = new ReentrantReadWriteLock(); + + /** + * Registered {@link ThreadPoolPlugin}. + */ + private final Map registeredPlugins = new HashMap<>(16); + + /** + * Registered {@link TaskAwarePlugin}. + */ + private final List taskAwarePluginList = new ArrayList<>(); + + /** + * Registered {@link ExecuteAwarePlugin}. + */ + private final List executeAwarePluginList = new ArrayList<>(); + + /** + * Registered {@link RejectedAwarePlugin}. + */ + private final List rejectedAwarePluginList = new ArrayList<>(); + + /** + * Registered {@link ShutdownAwarePlugin}. + */ + private final List shutdownAwarePluginList = new ArrayList<>(); + + /** + * Clear all. + */ + @Override + public synchronized void clear() { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + registeredPlugins.clear(); + taskAwarePluginList.clear(); + executeAwarePluginList.clear(); + rejectedAwarePluginList.clear(); + shutdownAwarePluginList.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + @Override + public void register(@NonNull ThreadPoolPlugin plugin) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + String id = plugin.getId(); + Assert.isTrue(!isRegistered(id), "The plug-in with id [" + id + "] has been registered"); + + // register plugin + registeredPlugins.put(id, plugin); + // quick index + if (plugin instanceof TaskAwarePlugin) { + taskAwarePluginList.add((TaskAwarePlugin)plugin); + } + if (plugin instanceof ExecuteAwarePlugin) { + executeAwarePluginList.add((ExecuteAwarePlugin)plugin); + } + if (plugin instanceof RejectedAwarePlugin) { + rejectedAwarePluginList.add((RejectedAwarePlugin)plugin); + } + if (plugin instanceof ShutdownAwarePlugin) { + shutdownAwarePluginList.add((ShutdownAwarePlugin)plugin); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + @Override + public void unregister(String pluginId) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + Optional.ofNullable(pluginId) + .map(registeredPlugins::remove) + .ifPresent(old -> { + // remove quick index if necessary + if (old instanceof TaskAwarePlugin) { + taskAwarePluginList.remove(old); + } + if (old instanceof ExecuteAwarePlugin) { + executeAwarePluginList.remove(old); + } + if (old instanceof RejectedAwarePlugin) { + rejectedAwarePluginList.remove(old); + } + if (old instanceof ShutdownAwarePlugin) { + shutdownAwarePluginList.remove(old); + } + }); + } finally { + writeLock.unlock(); + } + } + + @Override + public Collection getAllPlugins() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return registeredPlugins.values(); + } finally { + readLock.unlock(); + } + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + @Override + public boolean isRegistered(String pluginId) { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return registeredPlugins.containsKey(pluginId); + } finally { + readLock.unlock(); + } + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @param plugin type + * @return {@link ThreadPoolPlugin}, null if unregister + */ + @Nullable + @Override + @SuppressWarnings("unchecked") + public A getPlugin(String pluginId) { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return (A) registeredPlugins.get(pluginId); + } finally { + readLock.unlock(); + } + } + + /** + * Get execute plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + public Collection getExecuteAwareList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return executeAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get rejected plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + public Collection getRejectedAwareList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return rejectedAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get shutdown plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getShutdownAwareList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return shutdownAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get shutdown plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getTaskAwareList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return taskAwarePluginList; + } finally { + readLock.unlock(); + } + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java new file mode 100644 index 00000000..413df1f5 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +/** + * Callback during task execution. + * + * @author huangchengxing + */ +public interface ExecuteAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before task execution. + * + * @param thread thread of executing task + * @param runnable task + * @see ExtensibleThreadPoolExecutor#beforeExecute + */ + default void beforeExecute(Thread thread, Runnable runnable) { + } + + /** + * Callback when task is executed. + * + * @param runnable runnable + * @return tasks to be execute + * @see ExtensibleThreadPoolExecutor#execute + */ + default Runnable execute(Runnable runnable) { + return runnable; + } + + /** + * Callback after task execution. + * + * @param runnable runnable + * @param throwable exception thrown during execution + * @see ExtensibleThreadPoolExecutor#afterExecute + */ + default void afterExecute(Runnable runnable, Throwable throwable) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java new file mode 100644 index 00000000..284d94c4 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java @@ -0,0 +1,47 @@ +package cn.hippo4j.core.plugin; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plug in runtime information. + * + * @author huangchengxing + */ +@RequiredArgsConstructor +@Getter +public class PluginRuntime { + + /** + * plugin id + */ + private final String pluginId; + + /** + * runtime info items + */ + private final List items = new ArrayList<>(); + + /** + * Add a runtime info item. + * + * @param name name + * @param value value + * @return runtime info item + */ + public PluginRuntime addItem(String name, Object value) { + items.add(new Item(name, value)); + return this; + } + + @Getter + @RequiredArgsConstructor + public static class Item { + private final String name; + private final Object value; + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java new file mode 100644 index 00000000..75bbaa60 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback when task is rejected. + * + * @author huangchengxing + */ +public interface RejectedAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java new file mode 100644 index 00000000..75f99be2 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback before thread-pool shutdown. + * + * @author huangchengxing + */ +public interface ShutdownAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before pool shutdown. + * + * @param executor executor + * @see ThreadPoolExecutor#shutdown() + * @see ThreadPoolExecutor#shutdownNow() + */ + default void beforeShutdown(ThreadPoolExecutor executor) { + // do nothing + } + + /** + * Callback after pool shutdown. + * + * @param executor executor + * @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called + * @see ThreadPoolExecutor#shutdown() + * @see ThreadPoolExecutor#shutdownNow() + */ + default void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + // do nothing + } + + /** + * Callback after pool terminated. + * + * @param executor executor + * @see ThreadPoolExecutor#terminated() + */ + default void afterTerminated(ExtensibleThreadPoolExecutor executor) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java new file mode 100644 index 00000000..3d4969e1 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback during task submit in thread-pool. + * + * @author huangchengxing + */ +public interface TaskAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool. + * + * @param executor executor + * @param runnable original task + * @return Tasks that really need to be performed + * @see ThreadPoolExecutor#newTaskFor(Runnable, Object) + */ + default Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { + return runnable; + } + + /** + * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool. + * + * @param executor executor + * @param future original task + * @return Tasks that really need to be performed + * @see ThreadPoolExecutor#newTaskFor(Callable) + */ + default Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { + return future; + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java new file mode 100644 index 00000000..6caf27e2 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +/** + * Thread pool action aware. + * + * @author huangchengxing + * @see ExtensibleThreadPoolExecutor + * @see ThreadPoolPluginRegistry + * @see TaskAwarePlugin + * @see ExecuteAwarePlugin + * @see ShutdownAwarePlugin + * @see RejectedAwarePlugin + */ +public interface ThreadPoolPlugin { + + /** + * Get id. + * + * @return id + */ + String getId(); + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + default PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java new file mode 100644 index 00000000..91e32c02 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java @@ -0,0 +1,28 @@ +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +/** + * Factory of {@link ThreadPoolPlugin}. + * + * @author huangchengxing + */ +public interface ThreadPoolPluginRegistrar { + + /** + * Get id. + * In spring container, the obtained id will be used as the alias of the bean name. + * + * @return id + */ + String getId(); + + /** + * Create and register plugin for the specified thread-pool instance + * + * @param registry thread pool plugin registry + * @param executor executor + */ + void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor); + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java new file mode 100644 index 00000000..70ca1a94 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Collection; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Registry of {@link ThreadPoolPlugin} + * + * @author huangchengxing + */ +public interface ThreadPoolPluginRegistry { + + /** + * Clear all. + */ + void clear(); + + /** + * Get all registered plugins. + * + * @return plugins + */ + Collection getAllPlugins(); + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} + * already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + void register(ThreadPoolPlugin plugin); + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + boolean isRegistered(String pluginId); + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + void unregister(String pluginId); + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @param target aware type + * @return {@link ThreadPoolPlugin}, null if unregister + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + @Nullable + A getPlugin(String pluginId); + + /** + * Get execute aware plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + Collection getExecuteAwareList(); + + /** + * Get rejected aware plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + Collection getRejectedAwareList(); + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + Collection getShutdownAwareList(); + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + Collection getTaskAwareList(); + + /** + * Try to get target plugin and apply operation, do nothing if it's not present. + * + * @param pluginId plugin id + * @param targetType target type + * @param consumer operation for target plugin + * @param plugin type + * @return this instance + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + default ThreadPoolPluginRegistry getAndThen( + String pluginId, Class targetType, Consumer consumer) { + Optional.ofNullable(getPlugin(pluginId)) + .map(targetType::cast) + .ifPresent(consumer); + return this; + } + + /** + * Try to get target plugin and return value of apply function, return default value if it's not present. + * + * @param pluginId plugin id + * @param targetType target type + * @param function operation for target plugin + * @param defaultValue default value + * @param plugin type + * @return value of apply function, default value if plugin is not present + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + default R getAndThen(String pluginId, Class targetType, Function function, R defaultValue) { + return Optional.ofNullable(getPlugin(pluginId)) + .map(targetType::cast) + .map(function) + .orElse(defaultValue); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java new file mode 100644 index 00000000..c7392686 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java @@ -0,0 +1,124 @@ +package cn.hippo4j.core.plugin; + +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Collection; + +/** + * Thread pool action aware registry delegate. + * + * @author huangchengxing + */ +public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegistry { + + /** + * Get thread pool action aware registry. + * + * @return {@link ThreadPoolPluginRegistry} + */ + @NonNull + ThreadPoolPluginRegistry getThreadPoolPluginRegistry(); + + /** + * Clear all. + */ + @Override + default void clear() { + getThreadPoolPluginRegistry().clear(); + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin aware + */ + @Override + default void register(ThreadPoolPlugin plugin) { + getThreadPoolPluginRegistry().register(plugin); + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId name + * @return ture if target has been registered, false otherwise + */ + @Override + default boolean isRegistered(String pluginId) { + return getThreadPoolPluginRegistry().isRegistered(pluginId); + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId name + */ + @Override + default void unregister(String pluginId) { + getThreadPoolPluginRegistry().unregister(pluginId); + } + + /** + * Get all registered plugins. + * + * @return plugins + */ + @Override + default Collection getAllPlugins() { + return getThreadPoolPluginRegistry().getAllPlugins(); + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId target name + * @return {@link ThreadPoolPlugin}, null if unregister + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + @Nullable + @Override + default A getPlugin(String pluginId) { + return getThreadPoolPluginRegistry().getPlugin(pluginId); + } + + /** + * Get execute aware list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + default Collection getExecuteAwareList() { + return getThreadPoolPluginRegistry().getExecuteAwareList(); + } + + /** + * Get rejected aware list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + default Collection getRejectedAwareList() { + return getThreadPoolPluginRegistry().getRejectedAwareList(); + } + + /** + * Get shutdown aware list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + default Collection getShutdownAwareList() { + return getThreadPoolPluginRegistry().getShutdownAwareList(); + } + + /** + * Get shutdown aware list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + default Collection getTaskAwareList() { + return getThreadPoolPluginRegistry().getTaskAwareList(); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java new file mode 100644 index 00000000..62584a6b --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.PluginRuntime; +import lombok.Getter; +import lombok.NonNull; +import org.springframework.core.task.TaskDecorator; + +import java.util.ArrayList; +import java.util.List; + +/** + * Decorate tasks when they are submitted to thread-pool. + * + * @author huangchengxing + */ +public class TaskDecoratorPlugin implements ExecuteAwarePlugin { + + public static final String PLUGIN_NAME = "task-decorator-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * decorators + */ + @Getter + private final List decorators = new ArrayList<>(); + + /** + * Callback when task is executed. + * + * @param runnable runnable + * @return tasks to be execute + * @see ExtensibleThreadPoolExecutor#execute + */ + @Override + public Runnable execute(Runnable runnable) { + for (TaskDecorator decorator : decorators) { + runnable = decorator.decorate(runnable); + } + return runnable; + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addItem("decorators", decorators); + } + + /** + * Add a decorator + * + * @param decorator decorator + */ + public void addDecorator(@NonNull TaskDecorator decorator) { + decorators.remove(decorator); + decorators.add(decorator); + } + + /** + * Clear all decorators + * + */ + public void clearDecorators() { + decorators.clear(); + } + + /** + * Remove decorators + * + */ + public void removeDecorator(TaskDecorator decorator) { + decorators.remove(decorator); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java new file mode 100644 index 00000000..4abf7fda --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Record the number of tasks rejected by the thread pool. + * + * @author huangchengxing + */ +public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin { + + public static final String PLUGIN_NAME = "task-reject-count-record-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * rejection count + */ + @Setter + @Getter + private AtomicLong rejectCount = new AtomicLong(0); + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addItem("rejectCount", getRejectCountNum()); + } + + /** + * Record rejection count. + * + * @param r task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) { + rejectCount.incrementAndGet(); + } + + /** + * Get reject count num + * + * @return reject count num + */ + public Long getRejectCountNum() { + return rejectCount.get(); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java new file mode 100644 index 00000000..d4f9eebb --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Send alert notification when a task is rejected. + * + * @author huangchengxing + */ +public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { + + public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + if (!(executor instanceof ExtensibleThreadPoolExecutor)) { + return; + } + String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId(); + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) + .ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId)); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java new file mode 100644 index 00000000..a1ec8f85 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.SyncTimeRecorder; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.toolkit.SystemClock; +import lombok.RequiredArgsConstructor; + +import java.util.Objects; + +/** + * Record task execution time indicator. + * + * @author huangchengxing + * @see TaskTimeoutNotifyAlarmPlugin + */ +@RequiredArgsConstructor +public class TaskTimeRecordPlugin extends SyncTimeRecorder implements ExecuteAwarePlugin { + + public static final String PLUGIN_NAME = "task-time-record-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * start times of executed tasks + */ + private final ThreadLocal startTimes = new ThreadLocal<>(); + + /** + * Record the time when the worker thread starts executing the task. + * + * @param thread thread of executing task + * @param runnable task + * @see ExtensibleThreadPoolExecutor#beforeExecute + */ + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + startTimes.set(SystemClock.now()); + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + Summary summary = summarize(); + return new PluginRuntime(getId()) + .addItem("taskCount", summary.getTaskCount()) + .addItem("minTaskTime", summary.getMinTaskTime()) + .addItem("maxTaskTime", summary.getMaxTaskTime()) + .addItem("totalTaskTime", summary.getTotalTaskTime()) + .addItem("avgTaskTime", summary.getAvgTaskTimeMillis()); + } + + /** + * Record the total time for the worker thread to complete the task, and update the time record. + * + * @param runnable runnable + * @param throwable exception thrown during execution + */ + @Override + public void afterExecute(Runnable runnable, Throwable throwable) { + try { + Long startTime = startTimes.get(); + if (Objects.isNull(startTime)) { + return; + } + long executeTime = SystemClock.now() - startTime; + refreshTime(executeTime); + afterRefreshTime(executeTime); + } finally { + startTimes.remove(); + } + } + + /** + * The callback function provided to the subclass, which is called after {@link #refreshTime} + * + * @param executeTime executeTime + */ + protected void afterRefreshTime(long executeTime) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java new file mode 100644 index 00000000..f605aa06 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.Optional; + +/** + * Record task execution time indicator, + * and send alarm notification when the execution time exceeds the threshold. + * + * @author huangchengxing + */ +@AllArgsConstructor +public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin { + + public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + @Getter + @Setter + private Long executeTimeOut; + + /** + * thread-pool + */ + private final ExtensibleThreadPoolExecutor threadPoolExecutor; + + /** + * Check whether the task execution time exceeds {@link #executeTimeOut}, + * if it exceeds this time, send an alarm notification. + * + * @param executeTime executeTime in nanosecond + */ + @Override + public void afterRefreshTime(long executeTime) { + if (executeTime <= executeTimeOut) { + return; + } + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) + .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm( + threadPoolExecutor.getThreadPoolId(), executeTime, executeTimeOut, threadPoolExecutor)); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java new file mode 100644 index 00000000..dd64fadf --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.ShutdownAwarePlugin; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.*; + +/** + * After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()}, + * if necessary, cancel the remaining tasks in the pool, + * and wait for the thread pool to terminate until + * the blocked main thread has timed out or the thread pool has completely terminated. + * + * @author huangchengxing + */ +@Accessors(chain = true) +@Getter +@Slf4j +@AllArgsConstructor +public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin { + + public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * await termination millis + */ + @Setter + public long awaitTerminationMillis; + + /** + * wait for tasks to complete on shutdown + */ + @Setter + public boolean waitForTasksToCompleteOnShutdown; + + /** + * Callback before pool shutdown. + * + * @param executor executor + */ + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + if (executor instanceof ExtensibleThreadPoolExecutor) { + ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor; + String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId(); + if (log.isInfoEnabled()) { + log.info("Before shutting down ExecutorService" + (threadPoolId != null ? " '" + threadPoolId + "'" : "")); + } + } + } + + /** + * Callback after pool shutdown. + * if {@link #waitForTasksToCompleteOnShutdown} return {@code true}, + * cancel the remaining tasks, + * then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary. + * + * @param executor executor + * @param remainingTasks remainingTasks + */ + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + if (executor instanceof ExtensibleThreadPoolExecutor) { + ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor; + if (!waitForTasksToCompleteOnShutdown && CollectionUtil.isNotEmpty(remainingTasks)) { + remainingTasks.forEach(this::cancelRemainingTask); + } + awaitTerminationIfNecessary(pool); + } + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addItem("awaitTerminationMillis", awaitTerminationMillis) + .addItem("waitForTasksToCompleteOnShutdown", waitForTasksToCompleteOnShutdown); + } + + /** + * Cancel the given remaining task which never commended execution, + * as returned from {@link ExecutorService#shutdownNow()}. + * + * @param task the task to cancel (typically a {@link RunnableFuture}) + * @see RunnableFuture#cancel(boolean) + * @since 5.0.5 + */ + protected void cancelRemainingTask(Runnable task) { + if (task instanceof Future) { + ((Future) task).cancel(true); + } + } + + /** + * Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}. + */ + private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) { + String threadPoolId = executor.getThreadPoolId(); + if (this.awaitTerminationMillis <= 0) { + return; + } + try { + boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS); + if (!isTerminated && log.isWarnEnabled()) { + log.warn("Timed out while waiting for executor" + " '" + threadPoolId + "'" + " to terminate."); + } + } catch (InterruptedException ex) { + if (log.isWarnEnabled()) { + log.warn("Interrupted while waiting for executor" + " '" + threadPoolId + "'" + " to terminate."); + } + Thread.currentThread().interrupt(); + } + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java new file mode 100644 index 00000000..fde32550 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -0,0 +1,9 @@ +package cn.hippo4j.core.executor; + +/** + * test for {@link DynamicThreadPoolExecutor} + * + * @author huangchengxing + */ +public class DynamicThreadPoolExecutorTest { +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java new file mode 100644 index 00000000..8582fffd --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java @@ -0,0 +1,167 @@ +package cn.hippo4j.core.executor; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.plugin.*; +import lombok.Getter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link ExtensibleThreadPoolExecutor} + * + * @author huangchengxing + */ +public class ExtensibleThreadPoolExecutorTest { + + private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy(); + + private ExtensibleThreadPoolExecutor executor; + + @Before + public void initExecutor() { + executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginRegistry(), + 5, 5, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, originalHandler + ); + } + + @Test + public void testGetOrSetRejectedHandler() { + RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); + executor.setRejectedExecutionHandler(handler); + Assert.assertSame(handler, executor.getRejectedExecutionHandler()); + } + + @Test + public void testInvokeTaskAwarePlugin() { + TestTaskAwarePlugin plugin = new TestTaskAwarePlugin(); + executor.register(plugin); + executor.submit(() -> {}); + executor.submit(() -> true); + executor.submit(() -> {}, false); + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeExecuteAwarePlugin() { + TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin(); + executor.register(plugin); + executor.execute(() -> {}); + ThreadUtil.sleep(500L); + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeRejectedAwarePlugin() { + executor.setCorePoolSize(1); + executor.setMaximumPoolSize(1); + + TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin(); + executor.register(plugin); + // blocking pool and queue + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + // reject 3 tasks + executor.submit(() -> {}); + executor.submit(() -> {}); + executor.submit(() -> {}); + + ThreadUtil.sleep(500L); + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException { + TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin(); + executor.register(plugin); + executor.shutdown(); + if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + } + + @Test + public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException { + TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin(); + executor.register(plugin); + executor.shutdownNow(); + if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + } + + @Getter + private final static class TestTaskAwarePlugin implements TaskAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestTaskAwarePlugin"; + @Override + public Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value); + } + @Override + public Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskCreate(executor, future); + } + } + + @Getter + private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestExecuteAwarePlugin"; + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + invokeCount.incrementAndGet(); + ExecuteAwarePlugin.super.beforeExecute(thread, runnable); + } + @Override + public Runnable execute(Runnable runnable) { + invokeCount.incrementAndGet(); + return ExecuteAwarePlugin.super.execute(runnable); + } + @Override + public void afterExecute(Runnable runnable, Throwable throwable) { + invokeCount.incrementAndGet(); + ExecuteAwarePlugin.super.afterExecute(runnable, throwable); + } + } + + @Getter + private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestRejectedAwarePlugin"; + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + } + } + + @Getter + private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestShutdownAwarePlugin"; + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.beforeShutdown(executor); + } + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks); + } + @Override + public void afterTerminated(ExtensibleThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.afterTerminated(executor); + } + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java new file mode 100644 index 00000000..176eece0 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java @@ -0,0 +1,108 @@ +package cn.hippo4j.core.plugin; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * test for {@link DefaultThreadPoolPluginRegistry} + * + * @author huangchengxing + */ +public class DefaultThreadPoolPluginRegistryTest { + + private DefaultThreadPoolPluginRegistry registry; + + @Before + public void initRegistry() { + registry = new DefaultThreadPoolPluginRegistry(); + } + + @Test + public void testRegister() { + TaskAwarePlugin taskAwarePlugin = new TestTaskAwarePlugin(); + registry.register(taskAwarePlugin); + Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin)); + Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId())); + Assert.assertEquals(1, registry.getTaskAwareList().size()); + Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId())); + registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, plugin -> Assert.assertSame(plugin, taskAwarePlugin)); + Assert.assertEquals(taskAwarePlugin.getId(), registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, TestTaskAwarePlugin::getId, null)); + registry.unregister(taskAwarePlugin.getId()); + Assert.assertNull(registry.getPlugin(taskAwarePlugin.getId())); + + ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin(); + registry.register(executeAwarePlugin); + Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId())); + Assert.assertEquals(1, registry.getExecuteAwareList().size()); + Assert.assertSame(executeAwarePlugin, registry.getPlugin(executeAwarePlugin.getId())); + registry.unregister(executeAwarePlugin.getId()); + Assert.assertNull(registry.getPlugin(executeAwarePlugin.getId())); + + RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin(); + registry.register(rejectedAwarePlugin); + Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId())); + Assert.assertEquals(1, registry.getRejectedAwareList().size()); + Assert.assertSame(rejectedAwarePlugin, registry.getPlugin(rejectedAwarePlugin.getId())); + registry.unregister(rejectedAwarePlugin.getId()); + Assert.assertNull(registry.getPlugin(rejectedAwarePlugin.getId())); + + ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin(); + registry.register(shutdownAwarePlugin); + Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId())); + Assert.assertEquals(1, registry.getShutdownAwareList().size()); + Assert.assertSame(shutdownAwarePlugin, registry.getPlugin(shutdownAwarePlugin.getId())); + registry.unregister(shutdownAwarePlugin.getId()); + Assert.assertNull(registry.getPlugin(shutdownAwarePlugin.getId())); + } + + private final static class TestTaskAwarePlugin implements TaskAwarePlugin { + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestTaskAwarePlugin"; + } + } + + private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin { + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestExecuteAwarePlugin"; + } + } + + private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin { + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestRejectedAwarePlugin"; + } + } + + private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestShutdownAwarePlugin"; + } + } + + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java new file mode 100644 index 00000000..c4a58e87 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java @@ -0,0 +1,59 @@ +package cn.hippo4j.core.plugin; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * test {@link ThreadPoolPlugin}'s info to json + * + * @author huangchengxing + */ +public class PluginRuntimeTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @SneakyThrows + @Test + public void testGetPluginRuntime() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginRegistry(), + 1, 1, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() + ); + + // TaskRejectCountRecordPlugin + TaskRejectCountRecordPlugin taskRejectCountRecordPlugin = new TaskRejectCountRecordPlugin(); + executor.register(taskRejectCountRecordPlugin); + + // TaskRejectCountRecordPlugin + TaskTimeRecordPlugin taskTimeRecordPlugin = new TaskTimeRecordPlugin(); + executor.register(taskTimeRecordPlugin); + + // ThreadPoolExecutorShutdownPlugin + ThreadPoolExecutorShutdownPlugin executorShutdownPlugin = new ThreadPoolExecutorShutdownPlugin(2000L, true); + executor.register(executorShutdownPlugin); + + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(300L)); + executor.submit(() -> ThreadUtil.sleep(200L)); + + ThreadUtil.sleep(1000L); + List runtimeList = executor.getAllPlugins().stream() + .map(ThreadPoolPlugin::getPluginRuntime) + .collect(Collectors.toList()); + System.out.println(objectMapper.writeValueAsString(runtimeList)); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java new file mode 100644 index 00000000..fb488d7b --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java @@ -0,0 +1,45 @@ +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link TaskDecoratorPlugin} + * + * @author huangchengxing + */ +public class TaskDecoratorPluginTest { + + private final AtomicInteger taskExecuteCount = new AtomicInteger(0); + + @Test + public void testExecute() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginRegistry(), + 5, 5, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() + ); + TaskDecoratorPlugin plugin = new TaskDecoratorPlugin(); + plugin.addDecorator(runnable -> () -> { + taskExecuteCount.incrementAndGet(); + runnable.run(); + }); + plugin.addDecorator(runnable -> () -> { + taskExecuteCount.incrementAndGet(); + runnable.run(); + }); + executor.register(plugin); + executor.execute(() -> {}); + ThreadUtil.sleep(500L); + Assert.assertEquals(2, taskExecuteCount.get()); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java new file mode 100644 index 00000000..834e0017 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java @@ -0,0 +1,38 @@ +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for {@link TaskRejectCountRecordPlugin} + * + * @author huangchengxing + */ +public class TaskRejectCountRecordPluginTest { + + @Test + public void testExecute() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginRegistry(), + 1, 1, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() + ); + + TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin(); + executor.register(plugin); + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + + ThreadUtil.sleep(500L); + Assert.assertEquals((Long)1L, plugin.getRejectCountNum()); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java new file mode 100644 index 00000000..9f997cb4 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java @@ -0,0 +1,9 @@ +package cn.hippo4j.core.plugin.impl; + +/** + * test for {@link TaskRejectNotifyAlarmPlugin} + * + * @author huangchengxing + */ +public class TaskRejectNotifyAlarmPluginTest { +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java new file mode 100644 index 00000000..6c945fe0 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java @@ -0,0 +1,42 @@ +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.SyncTimeRecorder; +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for {@link TaskTimeRecordPlugin} + * + * @author huangchengxing + */ +public class TaskTimeRecordPluginTest { + + @Test + public void testExecute() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginRegistry(), + 3, 3, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() + ); + + TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(); + executor.register(plugin); + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(300L)); + executor.submit(() -> ThreadUtil.sleep(200L)); + + ThreadUtil.sleep(1000L); + SyncTimeRecorder.Summary summary = plugin.summarize(); + Assert.assertEquals(1, summary.getMinTaskTime() / 100L); + Assert.assertEquals(3, summary.getMaxTaskTime() / 100L); + Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 100L); + Assert.assertEquals(6, summary.getTotalTaskTime() / 100L); + } +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java new file mode 100644 index 00000000..123348bc --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java @@ -0,0 +1,71 @@ +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import cn.hippo4j.core.plugin.ThreadPoolPlugin; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link ThreadPoolExecutorShutdownPlugin} + * + * @author huangchengxing + */ +public class ThreadPoolExecutorShutdownPluginTest { + + public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginRegistry(), + 2, 2, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() + ); + executor.register(plugin); + return executor; + } + + private static Callable getCallable(AtomicInteger completedCount) { + return () -> { + ThreadUtil.sleep(1000L); + return completedCount.incrementAndGet(); + }; + } + + @Test + public void testExecuteShutdownWhenWaitTaskCompleted() { + ExtensibleThreadPoolExecutor executor = getExecutor( + new ThreadPoolExecutorShutdownPlugin(2000L, true) + ); + + AtomicInteger completedCount = new AtomicInteger(0); + Callable future1 = getCallable(completedCount); + Callable future2 = getCallable(completedCount); + executor.submit(future1); + executor.submit(future2); + + executor.shutdown(); + Assert.assertEquals(2, completedCount.get()); + } + + @Test + public void testExecuteShutdownWhenNotWaitTaskCompleted() { + ExtensibleThreadPoolExecutor executor = getExecutor( + new ThreadPoolExecutorShutdownPlugin(-1L, true) + ); + + AtomicInteger completedCount = new AtomicInteger(0); + Callable future1 = getCallable(completedCount); + Callable future2 = getCallable(completedCount); + executor.submit(future1); + executor.submit(future2); + + executor.shutdown(); + Assert.assertEquals(0, completedCount.get()); + } +} \ No newline at end of file diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TimeoutNotifyAlarmTaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TimeoutNotifyAlarmTaskTimeRecordPluginTest.java new file mode 100644 index 00000000..e37c4680 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TimeoutNotifyAlarmTaskTimeRecordPluginTest.java @@ -0,0 +1,9 @@ +package cn.hippo4j.core.plugin.impl; + +/** + * test for {@link TaskTimeoutNotifyAlarmPlugin} + * + * @author huangchengxing + */ +public class TimeoutNotifyAlarmTaskTimeRecordPluginTest { +} diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index e570caa4..f2569c7e 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; @@ -43,7 +41,6 @@ import java.util.*; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; @@ -182,6 +179,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener