diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java index 87386a28..3401dcd0 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java @@ -17,7 +17,11 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.core.plugin.*; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import cn.hippo4j.core.plugin.ShutdownAwarePlugin; +import cn.hippo4j.core.plugin.TaskAwarePlugin; +import cn.hippo4j.core.plugin.ThreadPoolPlugin; import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; import lombok.AllArgsConstructor; @@ -28,7 +32,14 @@ import lombok.Setter; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.*; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** *

Extensible thread-pool executor.
@@ -96,13 +107,10 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements @NonNull ThreadFactory threadFactory, @NonNull RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - // Pool extended info. + // pool extended info. this.threadPoolId = threadPoolId; this.threadPoolPluginManager = threadPoolPluginManager; - // Proxy handler to support Aware callback. - while (handler instanceof RejectedAwareHandlerWrapper) { - handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); - } + // proxy handler to support callback, repeated packaging of the same rejection policy should be avoided here. this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler); super.setRejectedExecutionHandler(handlerWrapper); } @@ -124,7 +132,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements /** * {@inheritDoc} * - *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first. + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first.
+ * If the task becomes null after being processed by the {@link TaskAwarePlugin#beforeTaskExecute}, + * the task will not be submitted. * * @param runnable the task to execute */ @@ -133,6 +143,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { runnable = taskAwarePlugin.beforeTaskExecute(runnable); + if (Objects.isNull(runnable)) { + return; + } } super.execute(runnable); } @@ -251,9 +264,6 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements */ @Override public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) { - while (handler instanceof RejectedAwareHandlerWrapper) { - handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); - } handlerWrapper.setHandler(handler); } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java index cc41fbc0..c846a74e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java @@ -17,6 +17,9 @@ package cn.hippo4j.core.plugin; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + import java.util.concurrent.Callable; import java.util.concurrent.ThreadPoolExecutor; @@ -30,9 +33,13 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin { * * @param executor executor * @param runnable original task - * @return Tasks that really need to be performed + * @param value the default value for the returned future + * @param value type + * @return Tasks that really need to be performed, if the return task is null, + * terminate the execution of the next plugin immediately. * @see ThreadPoolExecutor#newTaskFor(Runnable, Object) */ + @Nullable default Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { return runnable; } @@ -42,9 +49,12 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin { * * @param executor executor * @param future original task - * @return Tasks that really need to be performed + * @param value type + * @return Tasks that really need to be performed, if the return task is null, + * terminate the execution of the next plugin immediately. * @see ThreadPoolExecutor#newTaskFor(Callable) */ + @Nullable default Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { return future; } @@ -53,10 +63,12 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin { * Callback when task is execute. * * @param runnable runnable - * @return tasks to be execute + * @return task to be executed, if the return task is null, + * terminate the execution of the next plug-in immediately. * @see ThreadPoolExecutor#execute */ - default Runnable beforeTaskExecute(Runnable runnable) { + @Nullable + default Runnable beforeTaskExecute(@NonNull Runnable runnable) { return runnable; } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java index bfe1eac0..749121f2 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java @@ -48,7 +48,7 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin { * @see ExtensibleThreadPoolExecutor#execute */ @Override - public Runnable beforeTaskExecute(Runnable runnable) { + public Runnable beforeTaskExecute(@NonNull Runnable runnable) { for (TaskDecorator decorator : decorators) { runnable = decorator.decorate(runnable); } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java index 2db35edc..219ab297 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java @@ -25,6 +25,8 @@ import cn.hippo4j.core.plugin.TaskAwarePlugin; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; import lombok.Getter; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -101,6 +103,13 @@ public class ExtensibleThreadPoolExecutorTest { }); ThreadUtil.sleep(500L); Assert.assertEquals(2, plugin.getInvokeCount().get()); + + // no task will be executed because it has been replaced with null + executor.register(new TestTaskToNullAwarePlugin()); + executor.execute(() -> { + }); + ThreadUtil.sleep(500L); + Assert.assertEquals(2, plugin.getInvokeCount().get()); } @Test @@ -148,6 +157,14 @@ public class ExtensibleThreadPoolExecutorTest { } } + private final static class TestTaskToNullAwarePlugin implements TaskAwarePlugin { + + @Override + public @Nullable Runnable beforeTaskExecute(@NonNull Runnable runnable) { + return null; + } + } + @Getter private final static class TestTaskAwarePlugin implements TaskAwarePlugin { @@ -164,7 +181,7 @@ public class ExtensibleThreadPoolExecutorTest { return TaskAwarePlugin.super.beforeTaskCreate(executor, future); } @Override - public Runnable beforeTaskExecute(Runnable runnable) { + public Runnable beforeTaskExecute(@NonNull Runnable runnable) { invokeCount.incrementAndGet(); return TaskAwarePlugin.super.beforeTaskExecute(runnable); }