refactor: rename ThreadPoolPluginRegistry and add default method

pull/842/head
huangchengxing 3 years ago
parent f5996a8a34
commit ad84ecbf12

@ -18,8 +18,8 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistrar;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
@ -29,7 +29,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskDecorator;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
@ -78,7 +77,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(
threadPoolId, new DefaultThreadPoolPluginRegistry(),
threadPoolId, new DefaultThreadPoolPluginManager(),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler
);
@ -92,37 +91,26 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
* @throws Exception in case of shutdown errors. Exceptions will get logged
* but not rethrown to allow other beans to release their resources as well.
*/
@Override
public void destroy() throws Exception {
getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
processor -> {
if (processor.isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
});
public void destroy() {
if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
}
public long getAwaitTerminationMillis() {
return getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis, -1L
);
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis)
.orElse(-1L);
}
public boolean isWaitForTasksToCompleteOnShutdown() {
return getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown, false
);
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown)
.orElse(false);
}
/**
@ -132,57 +120,42 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
*/
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
getAndThen(
ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
ThreadPoolExecutorShutdownPlugin.class,
processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis)
getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.ifPresent(processor -> processor
.setAwaitTerminationMillis(awaitTerminationMillis)
.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown)
);
);
}
public Long getRejectCountNum() {
return getAndThen(
TaskRejectCountRecordPlugin.PLUGIN_NAME,
TaskRejectCountRecordPlugin.class,
TaskRejectCountRecordPlugin::getRejectCountNum, -1L
);
return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCountNum)
.orElse(-1L);
}
public Long getExecuteTimeOut() {
return getAndThen(
TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME,
TaskTimeoutNotifyAlarmPlugin.class,
TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut, -1L
);
return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut)
.orElse(-1L);
}
public void setExecuteTimeOut(Long executeTimeOut) {
getAndThen(
TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME,
TaskTimeoutNotifyAlarmPlugin.class,
processor -> processor.setExecuteTimeOut(executeTimeOut)
);
getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut));
}
public TaskDecorator getTaskDecorator() {
return getAndThen(
TaskDecoratorPlugin.PLUGIN_NAME,
TaskDecoratorPlugin.class,
processor -> CollectionUtil.getFirst(processor.getDecorators()), null
);
return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.map(processor -> CollectionUtil.getFirst(processor.getDecorators()))
.orElse(null);
}
public void setTaskDecorator(TaskDecorator taskDecorator) {
if (Objects.nonNull(taskDecorator)) {
getAndThen(
TaskDecoratorPlugin.PLUGIN_NAME,
TaskDecoratorPlugin.class,
processor -> {
processor.clearDecorators();
processor.addDecorator(taskDecorator);
}
);
}
getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.ifPresent(processor -> {
processor.clearDecorators();
processor.addDecorator(taskDecorator);
});
}
public RejectedExecutionHandler getRedundancyHandler() {

@ -38,7 +38,7 @@ import java.util.concurrent.*;
* @author huangchengxing
*/
public class ExtensibleThreadPoolExecutor
extends ThreadPoolExecutor implements ThreadPoolPluginRegistryDelegate {
extends ThreadPoolExecutor implements ThreadPoolPluginManagerDelegate {
/**
* thread pool id
@ -51,7 +51,7 @@ public class ExtensibleThreadPoolExecutor
* action aware registry
*/
@Getter
private final ThreadPoolPluginRegistry threadPoolPluginRegistry;
private final ThreadPoolPluginManager threadPoolPluginManager;
/**
* handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
@ -62,7 +62,7 @@ public class ExtensibleThreadPoolExecutor
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolPluginRegistry action aware registry
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
@ -88,7 +88,7 @@ public class ExtensibleThreadPoolExecutor
*/
public ExtensibleThreadPoolExecutor(
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginRegistry threadPoolPluginRegistry,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@ -98,13 +98,13 @@ public class ExtensibleThreadPoolExecutor
// pool extended info
this.threadPoolId = threadPoolId;
this.threadPoolPluginRegistry = threadPoolPluginRegistry;
this.threadPoolPluginManager = threadPoolPluginManager;
// proxy handler to support Aware callback
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginRegistry, handler);
this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler);
super.setRejectedExecutionHandler(handlerWrapper);
}
@ -118,7 +118,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable));
}
@ -131,7 +131,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
public void execute(@NonNull Runnable runnable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
for (ExecuteAwarePlugin executeAwarePlugin : executeAwarePluginList) {
runnable = executeAwarePlugin.execute(runnable);
}
@ -149,7 +149,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable));
}
@ -164,7 +164,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
public void shutdown() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
super.shutdown();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList()));
@ -181,7 +181,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
public List<Runnable> shutdownNow() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
List<Runnable> tasks = super.shutdownNow();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks));
@ -196,7 +196,7 @@ public class ExtensibleThreadPoolExecutor
@Override
protected void terminated() {
super.terminated();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this));
}
@ -215,7 +215,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList();
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value);
}
@ -236,7 +236,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList();
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
callable = taskAwarePlugin.beforeTaskCreate(this, callable);
}
@ -269,6 +269,16 @@ public class ExtensibleThreadPoolExecutor
return handlerWrapper.getHandler();
}
/**
* Get thread-pool executor.
*
* @return thread-pool executor
*/
@Override
public ThreadPoolExecutor getThreadPoolExecutor() {
return this;
}
/**
* Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor},
* It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}.
@ -282,7 +292,7 @@ public class ExtensibleThreadPoolExecutor
/**
* thread-pool action aware registry
*/
private final ThreadPoolPluginRegistry registry;
private final ThreadPoolPluginManager registry;
/**
* original target
@ -300,7 +310,7 @@ public class ExtensibleThreadPoolExecutor
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Collection<RejectedAwarePlugin> rejectedAwarePluginList = registry.getRejectedAwareList();
Collection<RejectedAwarePlugin> rejectedAwarePluginList = registry.getRejectedAwarePluginList();
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}

@ -19,7 +19,6 @@ package cn.hippo4j.core.plugin;
import cn.hippo4j.common.toolkit.Assert;
import lombok.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.*;
import java.util.concurrent.locks.Lock;
@ -27,11 +26,11 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The default implementation of {@link ThreadPoolPluginRegistry}.
* The default implementation of {@link ThreadPoolPluginManager}.
*
* @author huangchengxing
*/
public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry {
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* lock of this instance
@ -183,14 +182,13 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry
* @param <A> plugin type
* @return {@link ThreadPoolPlugin}, null if unregister
*/
@Nullable
@Override
@SuppressWarnings("unchecked")
public <A extends ThreadPoolPlugin> A getPlugin(String pluginId) {
public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return (A) registeredPlugins.get(pluginId);
return (Optional<A>) Optional.ofNullable(registeredPlugins.get(pluginId));
} finally {
readLock.unlock();
}
@ -202,7 +200,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry
* @return {@link ExecuteAwarePlugin}
*/
@Override
public Collection<ExecuteAwarePlugin> getExecuteAwareList() {
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
@ -218,7 +216,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry
* @return {@link RejectedAwarePlugin}
*/
@Override
public Collection<RejectedAwarePlugin> getRejectedAwareList() {
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
@ -234,7 +232,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<ShutdownAwarePlugin> getShutdownAwareList() {
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
@ -250,7 +248,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<TaskAwarePlugin> getTaskAwareList() {
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {

@ -66,7 +66,7 @@ public class DefaultThreadPoolPluginRegistrar
* @param executor executor
*/
@Override
public void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor) {
public void doRegister(ThreadPoolPluginManager registry, ExtensibleThreadPoolExecutor executor) {
// callback when task execute
registry.register(new TaskDecoratorPlugin());
registry.register(new TaskTimeoutNotifyAlarmPlugin(executeTimeOut, executor));

@ -24,7 +24,7 @@ import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
*
* @author huangchengxing
* @see ExtensibleThreadPoolExecutor
* @see ThreadPoolPluginRegistry
* @see ThreadPoolPluginManager
* @see TaskAwarePlugin
* @see ExecuteAwarePlugin
* @see ShutdownAwarePlugin

@ -17,19 +17,16 @@
package cn.hippo4j.core.plugin;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Registry of {@link ThreadPoolPlugin}
* Manager of {@link ThreadPoolPlugin}.
*
* @author huangchengxing
*/
public interface ThreadPoolPluginRegistry {
public interface ThreadPoolPluginManager {
/**
* Clear all.
@ -73,74 +70,86 @@ public interface ThreadPoolPluginRegistry {
*
* @param pluginId plugin id
* @param <A> target aware type
* @return {@link ThreadPoolPlugin}, null if unregister
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Nullable
<A extends ThreadPoolPlugin> A getPlugin(String pluginId);
<A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId);
/**
* Get execute aware plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
Collection<ExecuteAwarePlugin> getExecuteAwareList();
Collection<ExecuteAwarePlugin> getExecuteAwarePluginList();
/**
* Get rejected aware plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
Collection<RejectedAwarePlugin> getRejectedAwareList();
Collection<RejectedAwarePlugin> getRejectedAwarePluginList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<ShutdownAwarePlugin> getShutdownAwareList();
Collection<ShutdownAwarePlugin> getShutdownAwarePluginList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<TaskAwarePlugin> getTaskAwareList();
Collection<TaskAwarePlugin> getTaskAwarePluginList();
// ==================== default methods ====================
/**
* Try to get target plugin and apply operation, do nothing if it's not present.
* Get plugin of type.
*
* @param pluginId plugin id
* @param targetType target type
* @param consumer operation for target plugin
* @param <A> plugin type
* @return this instance
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
* @param pluginType plugin type
* @return target plugin
*/
default <A extends ThreadPoolPlugin> ThreadPoolPluginRegistry getAndThen(
String pluginId, Class<A> targetType, Consumer<A> consumer) {
Optional.ofNullable(getPlugin(pluginId))
.map(targetType::cast)
.ifPresent(consumer);
return this;
default <A extends ThreadPoolPlugin> Optional<A> getPluginOfType(String pluginId, Class<A> pluginType) {
return getPlugin(pluginId)
.filter(pluginType::isInstance)
.map(pluginType::cast);
}
/**
* Try to get target plugin and return value of apply function, return default value if it's not present.
* Get all plugins of type.
*
* @param pluginId plugin id
* @param targetType target type
* @param function operation for target plugin
* @param defaultValue default value
* @param <A> plugin type
* @return value of apply function, default value if plugin is not present
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
* @param pluginType plugin type
* @return all plugins of type
*/
default <A extends ThreadPoolPlugin> Collection<A> getAllPluginsOfType(Class<A> pluginType) {
return getAllPlugins().stream()
.filter(pluginType::isInstance)
.map(pluginType::cast)
.collect(Collectors.toList());
}
/**
* Get {@link PluginRuntime} of all registered plugins.
*
* @return {@link PluginRuntime} of all registered plugins
*/
default Collection<PluginRuntime> getAllPluginRuntimes() {
return getAllPlugins().stream()
.map(ThreadPoolPlugin::getPluginRuntime)
.collect(Collectors.toList());
}
/**
* Get {@link PluginRuntime} of registered plugin.
*
* @return {@link PluginRuntime} of registered plugin
*/
default <A extends ThreadPoolPlugin, R> R getAndThen(String pluginId, Class<A> targetType, Function<A, R> function, R defaultValue) {
return Optional.ofNullable(getPlugin(pluginId))
.map(targetType::cast)
.map(function)
.orElse(defaultValue);
default Optional<PluginRuntime> getRuntime(String pluginId) {
return getPlugin(pluginId)
.map(ThreadPoolPlugin::getPluginRuntime);
}
}

@ -1,31 +1,48 @@
package cn.hippo4j.core.plugin;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Thread pool action aware registry delegate.
* Thread pool plugin manager delegate.
*
* @author huangchengxing
*/
public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegistry {
public interface ThreadPoolPluginManagerDelegate extends ThreadPoolPluginManager {
/**
* Get thread pool action aware registry.
*
* @return {@link ThreadPoolPluginRegistry}
* @return {@link ThreadPoolPluginManager}
*/
@NonNull
ThreadPoolPluginRegistry getThreadPoolPluginRegistry();
ThreadPoolPluginManager getThreadPoolPluginManager();
/**
* Get thread-pool id
*
* @return thread-pool id
*/
String getThreadPoolId();
/**
* Get thread-pool executor.
*
* @return thread-pool executor
*/
ThreadPoolExecutor getThreadPoolExecutor();
// ======================== delegate methods ========================
/**
* Clear all.
*/
@Override
default void clear() {
getThreadPoolPluginRegistry().clear();
getThreadPoolPluginManager().clear();
}
/**
@ -35,7 +52,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
*/
@Override
default void register(ThreadPoolPlugin plugin) {
getThreadPoolPluginRegistry().register(plugin);
getThreadPoolPluginManager().register(plugin);
}
/**
@ -46,7 +63,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
*/
@Override
default boolean isRegistered(String pluginId) {
return getThreadPoolPluginRegistry().isRegistered(pluginId);
return getThreadPoolPluginManager().isRegistered(pluginId);
}
/**
@ -56,7 +73,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
*/
@Override
default void unregister(String pluginId) {
getThreadPoolPluginRegistry().unregister(pluginId);
getThreadPoolPluginManager().unregister(pluginId);
}
/**
@ -66,7 +83,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
*/
@Override
default Collection<ThreadPoolPlugin> getAllPlugins() {
return getThreadPoolPluginRegistry().getAllPlugins();
return getThreadPoolPluginManager().getAllPlugins();
}
/**
@ -76,10 +93,9 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
* @return {@link ThreadPoolPlugin}, null if unregister
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Nullable
@Override
default <A extends ThreadPoolPlugin> A getPlugin(String pluginId) {
return getThreadPoolPluginRegistry().getPlugin(pluginId);
default <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
return getThreadPoolPluginManager().getPlugin(pluginId);
}
/**
@ -88,8 +104,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
* @return {@link ExecuteAwarePlugin}
*/
@Override
default Collection<ExecuteAwarePlugin> getExecuteAwareList() {
return getThreadPoolPluginRegistry().getExecuteAwareList();
default Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
return getThreadPoolPluginManager().getExecuteAwarePluginList();
}
/**
@ -98,8 +114,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
* @return {@link RejectedAwarePlugin}
*/
@Override
default Collection<RejectedAwarePlugin> getRejectedAwareList() {
return getThreadPoolPluginRegistry().getRejectedAwareList();
default Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
return getThreadPoolPluginManager().getRejectedAwarePluginList();
}
/**
@ -108,8 +124,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<ShutdownAwarePlugin> getShutdownAwareList() {
return getThreadPoolPluginRegistry().getShutdownAwareList();
default Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
return getThreadPoolPluginManager().getShutdownAwarePluginList();
}
/**
@ -118,7 +134,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<TaskAwarePlugin> getTaskAwareList() {
return getThreadPoolPluginRegistry().getTaskAwareList();
default Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return getThreadPoolPluginManager().getTaskAwarePluginList();
}
}

@ -23,6 +23,6 @@ public interface ThreadPoolPluginRegistrar {
* @param registry thread pool plugin registry
* @param executor executor
*/
void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor);
void doRegister(ThreadPoolPluginManager registry, ExtensibleThreadPoolExecutor executor);
}

@ -25,7 +25,7 @@ public class ExtensibleThreadPoolExecutorTest {
@Before
public void initExecutor() {
executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
"test", new DefaultThreadPoolPluginManager(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, originalHandler
);

@ -5,17 +5,17 @@ import org.junit.Before;
import org.junit.Test;
/**
* test for {@link DefaultThreadPoolPluginRegistry}
* test for {@link DefaultThreadPoolPluginManager}
*
* @author huangchengxing
*/
public class DefaultThreadPoolPluginRegistryTest {
public class DefaultThreadPoolPluginManagerTest {
private DefaultThreadPoolPluginRegistry registry;
private DefaultThreadPoolPluginManager registry;
@Before
public void initRegistry() {
registry = new DefaultThreadPoolPluginRegistry();
registry = new DefaultThreadPoolPluginManager();
}
@Test
@ -24,17 +24,18 @@ public class DefaultThreadPoolPluginRegistryTest {
registry.register(taskAwarePlugin);
Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin));
Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId()));
Assert.assertEquals(1, registry.getTaskAwareList().size());
Assert.assertEquals(1, registry.getTaskAwarePluginList().size());
Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId()));
registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, plugin -> Assert.assertSame(plugin, taskAwarePlugin));
Assert.assertEquals(taskAwarePlugin.getId(), registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, TestTaskAwarePlugin::getId, null));
registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class)
.ifPresent(plugin -> Assert.assertSame(plugin, taskAwarePlugin));
Assert.assertEquals(taskAwarePlugin.getId(), registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class).map(TestTaskAwarePlugin::getId).orElse(null));
registry.unregister(taskAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(taskAwarePlugin.getId()));
ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin();
registry.register(executeAwarePlugin);
Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId()));
Assert.assertEquals(1, registry.getExecuteAwareList().size());
Assert.assertEquals(1, registry.getExecuteAwarePluginList().size());
Assert.assertSame(executeAwarePlugin, registry.getPlugin(executeAwarePlugin.getId()));
registry.unregister(executeAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(executeAwarePlugin.getId()));
@ -42,7 +43,7 @@ public class DefaultThreadPoolPluginRegistryTest {
RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin();
registry.register(rejectedAwarePlugin);
Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId()));
Assert.assertEquals(1, registry.getRejectedAwareList().size());
Assert.assertEquals(1, registry.getRejectedAwarePluginList().size());
Assert.assertSame(rejectedAwarePlugin, registry.getPlugin(rejectedAwarePlugin.getId()));
registry.unregister(rejectedAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(rejectedAwarePlugin.getId()));
@ -50,7 +51,7 @@ public class DefaultThreadPoolPluginRegistryTest {
ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin();
registry.register(shutdownAwarePlugin);
Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId()));
Assert.assertEquals(1, registry.getShutdownAwareList().size());
Assert.assertEquals(1, registry.getShutdownAwarePluginList().size());
Assert.assertSame(shutdownAwarePlugin, registry.getPlugin(shutdownAwarePlugin.getId()));
registry.unregister(shutdownAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(shutdownAwarePlugin.getId()));

@ -28,7 +28,7 @@ public class PluginRuntimeTest {
@Test
public void testGetPluginRuntime() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
"test", new DefaultThreadPoolPluginManager(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);

@ -2,7 +2,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
@ -23,7 +23,7 @@ public class TaskDecoratorPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
"test", new DefaultThreadPoolPluginManager(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);

@ -2,7 +2,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
@ -20,7 +20,7 @@ public class TaskRejectCountRecordPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
"test", new DefaultThreadPoolPluginManager(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);

@ -3,7 +3,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.SyncTimeRecorder;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
@ -21,7 +21,7 @@ public class TaskTimeRecordPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
"test", new DefaultThreadPoolPluginManager(),
3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);

@ -2,7 +2,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import org.junit.Assert;
import org.junit.Test;
@ -22,7 +22,7 @@ public class ThreadPoolExecutorShutdownPluginTest {
public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
"test", new DefaultThreadPoolPluginManager(),
2, 2, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);

Loading…
Cancel
Save