diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index f9530c16..8e7ceb1a 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -18,8 +18,8 @@ package cn.hippo4j.core.executor; import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistrar; -import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin; import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin; @@ -29,7 +29,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.core.task.TaskDecorator; -import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; @@ -78,7 +77,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl @NonNull ThreadFactory threadFactory, @NonNull RejectedExecutionHandler rejectedExecutionHandler) { super( - threadPoolId, new DefaultThreadPoolPluginRegistry(), + threadPoolId, new DefaultThreadPoolPluginManager(), corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueue, threadFactory, rejectedExecutionHandler ); @@ -92,37 +91,26 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl /** * Invoked by the containing {@code BeanFactory} on destruction of a bean. * - * @throws Exception in case of shutdown errors. Exceptions will get logged - * but not rethrown to allow other beans to release their resources as well. */ @Override - public void destroy() throws Exception { - getAndThen( - ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, - ThreadPoolExecutorShutdownPlugin.class, - processor -> { - if (processor.isWaitForTasksToCompleteOnShutdown()) { - super.shutdown(); - } else { - super.shutdownNow(); - } - }); + public void destroy() { + if (isWaitForTasksToCompleteOnShutdown()) { + super.shutdown(); + } else { + super.shutdownNow(); + } } public long getAwaitTerminationMillis() { - return getAndThen( - ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, - ThreadPoolExecutorShutdownPlugin.class, - ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis, -1L - ); + return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis) + .orElse(-1L); } public boolean isWaitForTasksToCompleteOnShutdown() { - return getAndThen( - ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, - ThreadPoolExecutorShutdownPlugin.class, - ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown, false - ); + return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .map(ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown) + .orElse(false); } /** @@ -132,57 +120,42 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown */ public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) { - getAndThen( - ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, - ThreadPoolExecutorShutdownPlugin.class, - processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis) + getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .ifPresent(processor -> processor + .setAwaitTerminationMillis(awaitTerminationMillis) .setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown) - ); + ); } public Long getRejectCountNum() { - return getAndThen( - TaskRejectCountRecordPlugin.PLUGIN_NAME, - TaskRejectCountRecordPlugin.class, - TaskRejectCountRecordPlugin::getRejectCountNum, -1L - ); + return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class) + .map(TaskRejectCountRecordPlugin::getRejectCountNum) + .orElse(-1L); } public Long getExecuteTimeOut() { - return getAndThen( - TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, - TaskTimeoutNotifyAlarmPlugin.class, - TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut, -1L - ); + return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut) + .orElse(-1L); } public void setExecuteTimeOut(Long executeTimeOut) { - getAndThen( - TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, - TaskTimeoutNotifyAlarmPlugin.class, - processor -> processor.setExecuteTimeOut(executeTimeOut) - ); + getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut)); } public TaskDecorator getTaskDecorator() { - return getAndThen( - TaskDecoratorPlugin.PLUGIN_NAME, - TaskDecoratorPlugin.class, - processor -> CollectionUtil.getFirst(processor.getDecorators()), null - ); + return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class) + .map(processor -> CollectionUtil.getFirst(processor.getDecorators())) + .orElse(null); } public void setTaskDecorator(TaskDecorator taskDecorator) { - if (Objects.nonNull(taskDecorator)) { - getAndThen( - TaskDecoratorPlugin.PLUGIN_NAME, - TaskDecoratorPlugin.class, - processor -> { - processor.clearDecorators(); - processor.addDecorator(taskDecorator); - } - ); - } + getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class) + .ifPresent(processor -> { + processor.clearDecorators(); + processor.addDecorator(taskDecorator); + }); } public RejectedExecutionHandler getRedundancyHandler() { diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java index b1b130ef..ce4454f7 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java @@ -38,7 +38,7 @@ import java.util.concurrent.*; * @author huangchengxing */ public class ExtensibleThreadPoolExecutor - extends ThreadPoolExecutor implements ThreadPoolPluginRegistryDelegate { + extends ThreadPoolExecutor implements ThreadPoolPluginManagerDelegate { /** * thread pool id @@ -51,7 +51,7 @@ public class ExtensibleThreadPoolExecutor * action aware registry */ @Getter - private final ThreadPoolPluginRegistry threadPoolPluginRegistry; + private final ThreadPoolPluginManager threadPoolPluginManager; /** * handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper @@ -62,7 +62,7 @@ public class ExtensibleThreadPoolExecutor * Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters. * * @param threadPoolId thread-pool id - * @param threadPoolPluginRegistry action aware registry + * @param threadPoolPluginManager action aware registry * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the @@ -88,7 +88,7 @@ public class ExtensibleThreadPoolExecutor */ public ExtensibleThreadPoolExecutor( @NonNull String threadPoolId, - @NonNull ThreadPoolPluginRegistry threadPoolPluginRegistry, + @NonNull ThreadPoolPluginManager threadPoolPluginManager, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, @NonNull BlockingQueue workQueue, @@ -98,13 +98,13 @@ public class ExtensibleThreadPoolExecutor // pool extended info this.threadPoolId = threadPoolId; - this.threadPoolPluginRegistry = threadPoolPluginRegistry; + this.threadPoolPluginManager = threadPoolPluginManager; // proxy handler to support Aware callback while (handler instanceof RejectedAwareHandlerWrapper) { handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); } - this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginRegistry, handler); + this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler); super.setRejectedExecutionHandler(handlerWrapper); } @@ -118,7 +118,7 @@ public class ExtensibleThreadPoolExecutor */ @Override protected void beforeExecute(Thread thread, Runnable runnable) { - Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList(); + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable)); } @@ -131,7 +131,7 @@ public class ExtensibleThreadPoolExecutor */ @Override public void execute(@NonNull Runnable runnable) { - Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList(); + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); for (ExecuteAwarePlugin executeAwarePlugin : executeAwarePluginList) { runnable = executeAwarePlugin.execute(runnable); } @@ -149,7 +149,7 @@ public class ExtensibleThreadPoolExecutor */ @Override protected void afterExecute(Runnable runnable, Throwable throwable) { - Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList(); + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable)); } @@ -164,7 +164,7 @@ public class ExtensibleThreadPoolExecutor */ @Override public void shutdown() { - Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList(); + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); super.shutdown(); shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList())); @@ -181,7 +181,7 @@ public class ExtensibleThreadPoolExecutor */ @Override public List shutdownNow() { - Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList(); + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); List tasks = super.shutdownNow(); shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks)); @@ -196,7 +196,7 @@ public class ExtensibleThreadPoolExecutor @Override protected void terminated() { super.terminated(); - Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList(); + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this)); } @@ -215,7 +215,7 @@ public class ExtensibleThreadPoolExecutor */ @Override protected RunnableFuture newTaskFor(Runnable runnable, T value) { - Collection taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList(); + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value); } @@ -236,7 +236,7 @@ public class ExtensibleThreadPoolExecutor */ @Override protected RunnableFuture newTaskFor(Callable callable) { - Collection taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList(); + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { callable = taskAwarePlugin.beforeTaskCreate(this, callable); } @@ -269,6 +269,16 @@ public class ExtensibleThreadPoolExecutor return handlerWrapper.getHandler(); } + /** + * Get thread-pool executor. + * + * @return thread-pool executor + */ + @Override + public ThreadPoolExecutor getThreadPoolExecutor() { + return this; + } + /** * Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor}, * It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}. @@ -282,7 +292,7 @@ public class ExtensibleThreadPoolExecutor /** * thread-pool action aware registry */ - private final ThreadPoolPluginRegistry registry; + private final ThreadPoolPluginManager registry; /** * original target @@ -300,7 +310,7 @@ public class ExtensibleThreadPoolExecutor */ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - Collection rejectedAwarePluginList = registry.getRejectedAwareList(); + Collection rejectedAwarePluginList = registry.getRejectedAwarePluginList(); rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor)); handler.rejectedExecution(r, executor); } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManager.java similarity index 92% rename from hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java rename to hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManager.java index 71fc74c2..e62f619e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManager.java @@ -19,7 +19,6 @@ package cn.hippo4j.core.plugin; import cn.hippo4j.common.toolkit.Assert; import lombok.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import java.util.*; import java.util.concurrent.locks.Lock; @@ -27,11 +26,11 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * The default implementation of {@link ThreadPoolPluginRegistry}. + * The default implementation of {@link ThreadPoolPluginManager}. * * @author huangchengxing */ -public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry { +public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager { /** * lock of this instance @@ -183,14 +182,13 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry * @param plugin type * @return {@link ThreadPoolPlugin}, null if unregister */ - @Nullable @Override @SuppressWarnings("unchecked") - public A getPlugin(String pluginId) { + public Optional getPlugin(String pluginId) { Lock readLock = instanceLock.readLock(); readLock.lock(); try { - return (A) registeredPlugins.get(pluginId); + return (Optional) Optional.ofNullable(registeredPlugins.get(pluginId)); } finally { readLock.unlock(); } @@ -202,7 +200,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry * @return {@link ExecuteAwarePlugin} */ @Override - public Collection getExecuteAwareList() { + public Collection getExecuteAwarePluginList() { Lock readLock = instanceLock.readLock(); readLock.lock(); try { @@ -218,7 +216,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry * @return {@link RejectedAwarePlugin} */ @Override - public Collection getRejectedAwareList() { + public Collection getRejectedAwarePluginList() { Lock readLock = instanceLock.readLock(); readLock.lock(); try { @@ -234,7 +232,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry * @return {@link ShutdownAwarePlugin} */ @Override - public Collection getShutdownAwareList() { + public Collection getShutdownAwarePluginList() { Lock readLock = instanceLock.readLock(); readLock.lock(); try { @@ -250,7 +248,7 @@ public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry * @return {@link ShutdownAwarePlugin} */ @Override - public Collection getTaskAwareList() { + public Collection getTaskAwarePluginList() { Lock readLock = instanceLock.readLock(); readLock.lock(); try { diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java index 786cb6d8..76a89fae 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java @@ -66,7 +66,7 @@ public class DefaultThreadPoolPluginRegistrar * @param executor executor */ @Override - public void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor) { + public void doRegister(ThreadPoolPluginManager registry, ExtensibleThreadPoolExecutor executor) { // callback when task execute registry.register(new TaskDecoratorPlugin()); registry.register(new TaskTimeoutNotifyAlarmPlugin(executeTimeOut, executor)); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java index 6caf27e2..03df5d99 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java @@ -24,7 +24,7 @@ import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; * * @author huangchengxing * @see ExtensibleThreadPoolExecutor - * @see ThreadPoolPluginRegistry + * @see ThreadPoolPluginManager * @see TaskAwarePlugin * @see ExecuteAwarePlugin * @see ShutdownAwarePlugin diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginManager.java similarity index 56% rename from hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java rename to hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginManager.java index 70ca1a94..595c44d8 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginManager.java @@ -17,19 +17,16 @@ package cn.hippo4j.core.plugin; -import org.checkerframework.checker.nullness.qual.Nullable; - import java.util.Collection; import java.util.Optional; -import java.util.function.Consumer; -import java.util.function.Function; +import java.util.stream.Collectors; /** - * Registry of {@link ThreadPoolPlugin} + * Manager of {@link ThreadPoolPlugin}. * * @author huangchengxing */ -public interface ThreadPoolPluginRegistry { +public interface ThreadPoolPluginManager { /** * Clear all. @@ -73,74 +70,86 @@ public interface ThreadPoolPluginRegistry { * * @param pluginId plugin id * @param target aware type - * @return {@link ThreadPoolPlugin}, null if unregister + * @return {@link ThreadPoolPlugin} * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type */ - @Nullable - A getPlugin(String pluginId); + Optional getPlugin(String pluginId); /** * Get execute aware plugin list. * * @return {@link ExecuteAwarePlugin} */ - Collection getExecuteAwareList(); + Collection getExecuteAwarePluginList(); /** * Get rejected aware plugin list. * * @return {@link RejectedAwarePlugin} */ - Collection getRejectedAwareList(); + Collection getRejectedAwarePluginList(); /** * Get shutdown aware plugin list. * * @return {@link ShutdownAwarePlugin} */ - Collection getShutdownAwareList(); + Collection getShutdownAwarePluginList(); /** * Get shutdown aware plugin list. * * @return {@link ShutdownAwarePlugin} */ - Collection getTaskAwareList(); + Collection getTaskAwarePluginList(); + + // ==================== default methods ==================== /** - * Try to get target plugin and apply operation, do nothing if it's not present. + * Get plugin of type. * * @param pluginId plugin id - * @param targetType target type - * @param consumer operation for target plugin - * @param plugin type - * @return this instance - * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + * @param pluginType plugin type + * @return target plugin */ - default ThreadPoolPluginRegistry getAndThen( - String pluginId, Class targetType, Consumer consumer) { - Optional.ofNullable(getPlugin(pluginId)) - .map(targetType::cast) - .ifPresent(consumer); - return this; + default Optional getPluginOfType(String pluginId, Class pluginType) { + return getPlugin(pluginId) + .filter(pluginType::isInstance) + .map(pluginType::cast); } /** - * Try to get target plugin and return value of apply function, return default value if it's not present. + * Get all plugins of type. * - * @param pluginId plugin id - * @param targetType target type - * @param function operation for target plugin - * @param defaultValue default value - * @param plugin type - * @return value of apply function, default value if plugin is not present - * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + * @param pluginType plugin type + * @return all plugins of type + */ + default Collection getAllPluginsOfType(Class pluginType) { + return getAllPlugins().stream() + .filter(pluginType::isInstance) + .map(pluginType::cast) + .collect(Collectors.toList()); + } + + /** + * Get {@link PluginRuntime} of all registered plugins. + * + * @return {@link PluginRuntime} of all registered plugins + */ + default Collection getAllPluginRuntimes() { + return getAllPlugins().stream() + .map(ThreadPoolPlugin::getPluginRuntime) + .collect(Collectors.toList()); + } + + /** + * Get {@link PluginRuntime} of registered plugin. + * + * @return {@link PluginRuntime} of registered plugin */ - default R getAndThen(String pluginId, Class targetType, Function function, R defaultValue) { - return Optional.ofNullable(getPlugin(pluginId)) - .map(targetType::cast) - .map(function) - .orElse(defaultValue); + default Optional getRuntime(String pluginId) { + return getPlugin(pluginId) + .map(ThreadPoolPlugin::getPluginRuntime); } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginManagerDelegate.java similarity index 52% rename from hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java rename to hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginManagerDelegate.java index c7392686..0a19a415 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginManagerDelegate.java @@ -1,31 +1,48 @@ package cn.hippo4j.core.plugin; import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; /** - * Thread pool action aware registry delegate. + * Thread pool plugin manager delegate. * * @author huangchengxing */ -public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegistry { +public interface ThreadPoolPluginManagerDelegate extends ThreadPoolPluginManager { /** * Get thread pool action aware registry. * - * @return {@link ThreadPoolPluginRegistry} + * @return {@link ThreadPoolPluginManager} */ @NonNull - ThreadPoolPluginRegistry getThreadPoolPluginRegistry(); + ThreadPoolPluginManager getThreadPoolPluginManager(); + + /** + * Get thread-pool id + * + * @return thread-pool id + */ + String getThreadPoolId(); + + /** + * Get thread-pool executor. + * + * @return thread-pool executor + */ + ThreadPoolExecutor getThreadPoolExecutor(); + + // ======================== delegate methods ======================== /** * Clear all. */ @Override default void clear() { - getThreadPoolPluginRegistry().clear(); + getThreadPoolPluginManager().clear(); } /** @@ -35,7 +52,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist */ @Override default void register(ThreadPoolPlugin plugin) { - getThreadPoolPluginRegistry().register(plugin); + getThreadPoolPluginManager().register(plugin); } /** @@ -46,7 +63,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist */ @Override default boolean isRegistered(String pluginId) { - return getThreadPoolPluginRegistry().isRegistered(pluginId); + return getThreadPoolPluginManager().isRegistered(pluginId); } /** @@ -56,7 +73,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist */ @Override default void unregister(String pluginId) { - getThreadPoolPluginRegistry().unregister(pluginId); + getThreadPoolPluginManager().unregister(pluginId); } /** @@ -66,7 +83,7 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist */ @Override default Collection getAllPlugins() { - return getThreadPoolPluginRegistry().getAllPlugins(); + return getThreadPoolPluginManager().getAllPlugins(); } /** @@ -76,10 +93,9 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist * @return {@link ThreadPoolPlugin}, null if unregister * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type */ - @Nullable @Override - default A getPlugin(String pluginId) { - return getThreadPoolPluginRegistry().getPlugin(pluginId); + default Optional getPlugin(String pluginId) { + return getThreadPoolPluginManager().getPlugin(pluginId); } /** @@ -88,8 +104,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist * @return {@link ExecuteAwarePlugin} */ @Override - default Collection getExecuteAwareList() { - return getThreadPoolPluginRegistry().getExecuteAwareList(); + default Collection getExecuteAwarePluginList() { + return getThreadPoolPluginManager().getExecuteAwarePluginList(); } /** @@ -98,8 +114,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist * @return {@link RejectedAwarePlugin} */ @Override - default Collection getRejectedAwareList() { - return getThreadPoolPluginRegistry().getRejectedAwareList(); + default Collection getRejectedAwarePluginList() { + return getThreadPoolPluginManager().getRejectedAwarePluginList(); } /** @@ -108,8 +124,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist * @return {@link ShutdownAwarePlugin} */ @Override - default Collection getShutdownAwareList() { - return getThreadPoolPluginRegistry().getShutdownAwareList(); + default Collection getShutdownAwarePluginList() { + return getThreadPoolPluginManager().getShutdownAwarePluginList(); } /** @@ -118,7 +134,8 @@ public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegist * @return {@link ShutdownAwarePlugin} */ @Override - default Collection getTaskAwareList() { - return getThreadPoolPluginRegistry().getTaskAwareList(); + default Collection getTaskAwarePluginList() { + return getThreadPoolPluginManager().getTaskAwarePluginList(); } + } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java index 91e32c02..e51e68b1 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java @@ -23,6 +23,6 @@ public interface ThreadPoolPluginRegistrar { * @param registry thread pool plugin registry * @param executor executor */ - void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor); + void doRegister(ThreadPoolPluginManager registry, ExtensibleThreadPoolExecutor executor); } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java index 8582fffd..5390350a 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java @@ -25,7 +25,7 @@ public class ExtensibleThreadPoolExecutorTest { @Before public void initExecutor() { executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginRegistry(), + "test", new DefaultThreadPoolPluginManager(), 5, 5, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, originalHandler ); diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManagerTest.java similarity index 80% rename from hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java rename to hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManagerTest.java index 176eece0..1a043f05 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManagerTest.java @@ -5,17 +5,17 @@ import org.junit.Before; import org.junit.Test; /** - * test for {@link DefaultThreadPoolPluginRegistry} + * test for {@link DefaultThreadPoolPluginManager} * * @author huangchengxing */ -public class DefaultThreadPoolPluginRegistryTest { +public class DefaultThreadPoolPluginManagerTest { - private DefaultThreadPoolPluginRegistry registry; + private DefaultThreadPoolPluginManager registry; @Before public void initRegistry() { - registry = new DefaultThreadPoolPluginRegistry(); + registry = new DefaultThreadPoolPluginManager(); } @Test @@ -24,17 +24,18 @@ public class DefaultThreadPoolPluginRegistryTest { registry.register(taskAwarePlugin); Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin)); Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId())); - Assert.assertEquals(1, registry.getTaskAwareList().size()); + Assert.assertEquals(1, registry.getTaskAwarePluginList().size()); Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId())); - registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, plugin -> Assert.assertSame(plugin, taskAwarePlugin)); - Assert.assertEquals(taskAwarePlugin.getId(), registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, TestTaskAwarePlugin::getId, null)); + registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class) + .ifPresent(plugin -> Assert.assertSame(plugin, taskAwarePlugin)); + Assert.assertEquals(taskAwarePlugin.getId(), registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class).map(TestTaskAwarePlugin::getId).orElse(null)); registry.unregister(taskAwarePlugin.getId()); Assert.assertNull(registry.getPlugin(taskAwarePlugin.getId())); ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin(); registry.register(executeAwarePlugin); Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId())); - Assert.assertEquals(1, registry.getExecuteAwareList().size()); + Assert.assertEquals(1, registry.getExecuteAwarePluginList().size()); Assert.assertSame(executeAwarePlugin, registry.getPlugin(executeAwarePlugin.getId())); registry.unregister(executeAwarePlugin.getId()); Assert.assertNull(registry.getPlugin(executeAwarePlugin.getId())); @@ -42,7 +43,7 @@ public class DefaultThreadPoolPluginRegistryTest { RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin(); registry.register(rejectedAwarePlugin); Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId())); - Assert.assertEquals(1, registry.getRejectedAwareList().size()); + Assert.assertEquals(1, registry.getRejectedAwarePluginList().size()); Assert.assertSame(rejectedAwarePlugin, registry.getPlugin(rejectedAwarePlugin.getId())); registry.unregister(rejectedAwarePlugin.getId()); Assert.assertNull(registry.getPlugin(rejectedAwarePlugin.getId())); @@ -50,7 +51,7 @@ public class DefaultThreadPoolPluginRegistryTest { ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin(); registry.register(shutdownAwarePlugin); Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId())); - Assert.assertEquals(1, registry.getShutdownAwareList().size()); + Assert.assertEquals(1, registry.getShutdownAwarePluginList().size()); Assert.assertSame(shutdownAwarePlugin, registry.getPlugin(shutdownAwarePlugin.getId())); registry.unregister(shutdownAwarePlugin.getId()); Assert.assertNull(registry.getPlugin(shutdownAwarePlugin.getId())); diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java index c4a58e87..1ec7c4dc 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java @@ -28,7 +28,7 @@ public class PluginRuntimeTest { @Test public void testGetPluginRuntime() { ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginRegistry(), + "test", new DefaultThreadPoolPluginManager(), 1, 1, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() ); diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java index fb488d7b..bde3abf8 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java @@ -2,7 +2,7 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; -import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager; import org.junit.Assert; import org.junit.Test; @@ -23,7 +23,7 @@ public class TaskDecoratorPluginTest { @Test public void testExecute() { ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginRegistry(), + "test", new DefaultThreadPoolPluginManager(), 5, 5, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() ); diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java index 834e0017..bb1e6d08 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java @@ -2,7 +2,7 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; -import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager; import org.junit.Assert; import org.junit.Test; @@ -20,7 +20,7 @@ public class TaskRejectCountRecordPluginTest { @Test public void testExecute() { ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginRegistry(), + "test", new DefaultThreadPoolPluginManager(), 1, 1, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() ); diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java index 6c945fe0..d98d5c32 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java @@ -3,7 +3,7 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.common.toolkit.SyncTimeRecorder; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; -import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager; import org.junit.Assert; import org.junit.Test; @@ -21,7 +21,7 @@ public class TaskTimeRecordPluginTest { @Test public void testExecute() { ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginRegistry(), + "test", new DefaultThreadPoolPluginManager(), 3, 3, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() ); diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java index 123348bc..ed766a9f 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java @@ -2,7 +2,7 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; -import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry; +import cn.hippo4j.core.plugin.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.ThreadPoolPlugin; import org.junit.Assert; import org.junit.Test; @@ -22,7 +22,7 @@ public class ThreadPoolExecutorShutdownPluginTest { public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) { ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginRegistry(), + "test", new DefaultThreadPoolPluginManager(), 2, 2, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy() );