Merge remote-tracking branch 'origin/develop' into develop

pull/993/head
chen.ma 2 years ago
commit bc6c06e435

@ -17,7 +17,11 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.core.plugin.*;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.AllArgsConstructor;
@ -28,7 +32,14 @@ import lombok.Setter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* <p>Extensible thread-pool executor. <br />
@ -96,13 +107,10 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// Pool extended info.
// pool extended info.
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
// Proxy handler to support Aware callback.
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
// proxy handler to support callback, repeated packaging of the same rejection policy should be avoided here.
this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler);
super.setRejectedExecutionHandler(handlerWrapper);
}
@ -124,7 +132,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first.
* <p>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first. <br />
* If the task becomes null after being processed by the {@link TaskAwarePlugin#beforeTaskExecute},
* the task will not be submitted.
*
* @param runnable the task to execute
*/
@ -133,6 +143,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskExecute(runnable);
if (Objects.isNull(runnable)) {
return;
}
}
super.execute(runnable);
}
@ -251,9 +264,6 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*/
@Override
public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
handlerWrapper.setHandler(handler);
}

@ -17,6 +17,9 @@
package cn.hippo4j.core.plugin;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
@ -30,9 +33,13 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
*
* @param executor executor
* @param runnable original task
* @return Tasks that really need to be performed
* @param value the default value for the returned future
* @param <V> value type
* @return Tasks that really need to be performed, if the return task is null,
* terminate the execution of the next plugin immediately.
* @see ThreadPoolExecutor#newTaskFor(Runnable, Object)
*/
@Nullable
default <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
return runnable;
}
@ -42,9 +49,12 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
*
* @param executor executor
* @param future original task
* @return Tasks that really need to be performed
* @param <V> value type
* @return Tasks that really need to be performed, if the return task is null,
* terminate the execution of the next plugin immediately.
* @see ThreadPoolExecutor#newTaskFor(Callable)
*/
@Nullable
default <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
return future;
}
@ -53,10 +63,12 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
* Callback when task is execute.
*
* @param runnable runnable
* @return tasks to be execute
* @return task to be executed, if the return task is null,
* terminate the execution of the next plug-in immediately.
* @see ThreadPoolExecutor#execute
*/
default Runnable beforeTaskExecute(Runnable runnable) {
@Nullable
default Runnable beforeTaskExecute(@NonNull Runnable runnable) {
return runnable;
}
}

@ -48,7 +48,7 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
* @see ExtensibleThreadPoolExecutor#execute
*/
@Override
public Runnable beforeTaskExecute(Runnable runnable) {
public Runnable beforeTaskExecute(@NonNull Runnable runnable) {
for (TaskDecorator decorator : decorators) {
runnable = decorator.decorate(runnable);
}

@ -25,6 +25,8 @@ import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import lombok.Getter;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -101,6 +103,13 @@ public class ExtensibleThreadPoolExecutorTest {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, plugin.getInvokeCount().get());
// no task will be executed because it has been replaced with null
executor.register(new TestTaskToNullAwarePlugin());
executor.execute(() -> {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, plugin.getInvokeCount().get());
}
@Test
@ -148,6 +157,14 @@ public class ExtensibleThreadPoolExecutorTest {
}
}
private final static class TestTaskToNullAwarePlugin implements TaskAwarePlugin {
@Override
public @Nullable Runnable beforeTaskExecute(@NonNull Runnable runnable) {
return null;
}
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
@ -164,7 +181,7 @@ public class ExtensibleThreadPoolExecutorTest {
return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
}
@Override
public Runnable beforeTaskExecute(Runnable runnable) {
public Runnable beforeTaskExecute(@NonNull Runnable runnable) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskExecute(runnable);
}

Loading…
Cancel
Save