feat: Support to enable or disable plugins that have been registered in the manager (#975)

pull/983/head
黄成兴 3 years ago committed by GitHub
parent c763d5cb04
commit 7f25d5309a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -29,6 +29,7 @@ import lombok.NonNull;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskDecorator;
import java.util.Objects; import java.util.Objects;
@ -88,7 +89,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
@NonNull ThreadFactory threadFactory, @NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) { @NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super( super(
threadPoolId, new DefaultThreadPoolPluginManager().setEnableSort(true), threadPoolId, new DefaultThreadPoolPluginManager().setPluginComparator(AnnotationAwareOrderComparator.INSTANCE),
corePoolSize, maximumPoolSize, keepAliveTime, unit, corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler); blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId); log.info("Initializing ExecutorService {}", threadPoolId);

@ -26,17 +26,23 @@ import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.core.annotation.AnnotationAwareOrderComparator; import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -49,72 +55,107 @@ import java.util.stream.Collectors;
* or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport} * or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport}
* to support its plugin based extension functions. * to support its plugin based extension functions.
* *
* <p>When {@link #isEnableSort()} is true, plugins can be obtained in batches * <h3>Order of plugin</h3>
* in the order specified by {@link AnnotationAwareOrderComparator}.<br /> * <p>By default, plugins are installed in the order in which they are registered.
* When the sorting function is enabled through {@link #setEnableSort} for the first time, * When {@link #isEnableSort()} is true, plugins can be obtained in batches
* in the order specified by {@link #pluginComparator}(if not null).<br />
* When the sorting function is enabled through {@link #setPluginComparator} for the first time,
* all registered plugins will be sorted, * all registered plugins will be sorted,
* Later, whenever a new plug-in is registered, all plug-ins will be reordered again. * Later, whenever a new plug-in is registered, all plug-ins will be reordered again.
* *
* <p><b>NOTE:</b> * <h3>Status of the plugin</h3>
* When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable. * <p>The plugins registered in the container can be divided into two states: <em>enabled</em> and <em>disabled</em>
* Plugins that are <em>disabled</em> cannot be obtained through the following methods
* <ul>
* <li>{@link #getTaskAwarePluginList}</li>
* <li>{@link #getExecuteAwarePluginList}</li>
* <li>{@link #getRejectedAwarePluginList}</li>
* <li>{@link #getShutdownAwarePluginList}</li>
* </ul>
* Generally, plugins in disabled status will not be used by {@link ThreadPoolPluginSupport}.
* users can switch the status of plugins through {@link #enable} and {@link #disable} methods.
*
* <h3>Thread-safe operation support</h3>
* <p>When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable.
* This means that until actually start iterating over the list, * This means that until actually start iterating over the list,
* registering or unregistering plugins through the manager will affect the results of the iteration. * registering or unregistering plugins through the manager will affect the results of the iteration.
* Therefore, we should try to ensure that <b>get the latest plugin list from the manager before each use</b>. * Therefore, we should try to ensure that <b>get the latest plugin list from the manager before each use</b>.
* *
* @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor * @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor
* @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor * @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor
* @see AnnotationAwareOrderComparator
*/ */
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager { public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/** /**
* Lock of this instance * Lock of this instance
*/ */
private final ReadWriteLockSupport mainLock = new ReadWriteLockSupport(new ReentrantReadWriteLock()); private final ReadWriteLockSupport mainLock;
/** /**
* Registered {@link ThreadPoolPlugin} * All registered {@link ThreadPoolPlugin}
*/ */
private final Map<String, ThreadPoolPlugin> registeredPlugins = new ConcurrentHashMap<>(16); private final Map<String, ThreadPoolPlugin> registeredPlugins = new ConcurrentHashMap<>(16);
/** /**
* Registered {@link TaskAwarePlugin} * Disabled plugins.
*/
private final Set<String> disabledPlugins = Collections.newSetFromMap(new ConcurrentHashMap<>(16));
/**
* Index of enabled {@link TaskAwarePlugin}
*/
private final QuickIndex<TaskAwarePlugin> taskAwarePluginList = new QuickIndex<>(TaskAwarePlugin.class);
/**
* Index of enabled {@link ExecuteAwarePlugin}
*/ */
private final List<TaskAwarePlugin> taskAwarePluginList = new CopyOnWriteArrayList<>(); private final QuickIndex<ExecuteAwarePlugin> executeAwarePluginList = new QuickIndex<>(ExecuteAwarePlugin.class);
/** /**
* Registered {@link ExecuteAwarePlugin} * Index of enabled {@link RejectedAwarePlugin}
*/ */
private final List<ExecuteAwarePlugin> executeAwarePluginList = new CopyOnWriteArrayList<>(); private final QuickIndex<RejectedAwarePlugin> rejectedAwarePluginList = new QuickIndex<>(RejectedAwarePlugin.class);
/** /**
* Registered {@link RejectedAwarePlugin} * Index of enabled {@link ShutdownAwarePlugin}
*/ */
private final List<RejectedAwarePlugin> rejectedAwarePluginList = new CopyOnWriteArrayList<>(); private final QuickIndex<ShutdownAwarePlugin> shutdownAwarePluginList = new QuickIndex<>(ShutdownAwarePlugin.class);
/** /**
* Registered {@link ShutdownAwarePlugin} * Comparator of {@link ThreadPoolPlugin}.
*/ */
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new CopyOnWriteArrayList<>(); private Comparator<Object> pluginComparator;
/** /**
* Enable sort. * Create a {@link DefaultThreadPoolPluginManager},
* By default, plugins are not sorted,
* and thread safety is implemented based on {@link ReentrantReadWriteLock}.
*/ */
@Getter public DefaultThreadPoolPluginManager() {
private boolean enableSort = false; this(new ReentrantReadWriteLock(), null);
}
/**
* Create a {@link DefaultThreadPoolPluginManager}.
*
* @param mainLock main lock
* @param pluginComparator comparator of plugin
*/
public DefaultThreadPoolPluginManager(
@NonNull ReadWriteLock mainLock, @Nullable Comparator<Object> pluginComparator) {
this.pluginComparator = pluginComparator;
this.mainLock = new ReadWriteLockSupport(mainLock);
}
/** /**
* Clear all. * Clear all.
*/ */
@Override @Override
public synchronized void clear() { public void clear() {
mainLock.runWithWriteLock(() -> { mainLock.runWithWriteLock(() -> {
Collection<ThreadPoolPlugin> plugins = registeredPlugins.values(); Collection<ThreadPoolPlugin> plugins = new ArrayList<>(registeredPlugins.values());
registeredPlugins.clear(); registeredPlugins.clear();
taskAwarePluginList.clear(); forQuickIndexes(QuickIndex::clear);
executeAwarePluginList.clear();
rejectedAwarePluginList.clear();
shutdownAwarePluginList.clear();
plugins.forEach(ThreadPoolPlugin::stop); plugins.forEach(ThreadPoolPlugin::stop);
}); });
} }
@ -126,7 +167,6 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
* @see ThreadPoolPlugin#getId() * @see ThreadPoolPlugin#getId()
* @see #isEnableSort * @see #isEnableSort
* @see AnnotationAwareOrderComparator#sort(List)
*/ */
@Override @Override
public void register(@NonNull ThreadPoolPlugin plugin) { public void register(@NonNull ThreadPoolPlugin plugin) {
@ -134,30 +174,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
String id = plugin.getId(); String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered"); Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered");
registeredPlugins.put(id, plugin); registeredPlugins.put(id, plugin);
if (plugin instanceof TaskAwarePlugin) { forQuickIndexes(quickIndex -> quickIndex.addIfPossible(plugin));
taskAwarePluginList.add((TaskAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(taskAwarePluginList);
}
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.add((ExecuteAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(executeAwarePluginList);
}
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.add((RejectedAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(rejectedAwarePluginList);
}
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(shutdownAwarePluginList);
}
}
plugin.start(); plugin.start();
}); });
} }
@ -190,22 +207,69 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
() -> Optional.ofNullable(pluginId) () -> Optional.ofNullable(pluginId)
.map(registeredPlugins::remove) .map(registeredPlugins::remove)
.ifPresent(plugin -> { .ifPresent(plugin -> {
if (plugin instanceof TaskAwarePlugin) { disabledPlugins.remove(pluginId);
taskAwarePluginList.remove(plugin); forQuickIndexes(quickIndex -> quickIndex.removeIfPossible(plugin));
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.remove(plugin);
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.remove(plugin);
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.remove(plugin);
}
plugin.stop(); plugin.stop();
})); }));
} }
/**
* Get id of disabled plugins.
*
* @return id of disabled plugins
*/
@Override
public Set<String> getAllDisabledPluginIds() {
return disabledPlugins;
}
/**
* Whether the plugin has been disabled.
*
* @param pluginId plugin id
* @return true if plugin has been disabled, false otherwise
*/
@Override
public boolean isDisabled(String pluginId) {
return disabledPlugins.contains(pluginId);
}
/**
* Enable plugin, make plugins will allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or enabled, false otherwise
*/
@Override
public boolean enable(String pluginId) {
return mainLock.applyWithReadLock(() -> {
ThreadPoolPlugin plugin = registeredPlugins.get(pluginId);
if (Objects.isNull(plugin) || !disabledPlugins.remove(pluginId)) {
return false;
}
forQuickIndexes(quickIndex -> quickIndex.addIfPossible(plugin));
return true;
});
}
/**
* Disable plugin, make plugins will not allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or disabled, false otherwise
*/
@Override
public boolean disable(String pluginId) {
return mainLock.applyWithReadLock(() -> {
ThreadPoolPlugin plugin = registeredPlugins.get(pluginId);
if (Objects.isNull(plugin) || !disabledPlugins.add(pluginId)) {
return false;
}
forQuickIndexes(quickIndex -> quickIndex.removeIfPossible(plugin));
return true;
});
}
/** /**
* Get all registered plugins. * Get all registered plugins.
* *
@ -217,9 +281,9 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
public Collection<ThreadPoolPlugin> getAllPlugins() { public Collection<ThreadPoolPlugin> getAllPlugins() {
return mainLock.applyWithReadLock(() -> { return mainLock.applyWithReadLock(() -> {
// sort if necessary // sort if necessary
if (enableSort) { if (isEnableSort()) {
return registeredPlugins.values().stream() return registeredPlugins.values().stream()
.sorted(AnnotationAwareOrderComparator.INSTANCE) .sorted(pluginComparator)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
return registeredPlugins.values(); return registeredPlugins.values();
@ -252,49 +316,68 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
} }
/** /**
* Get execute plugin list. * Get all enabled {@link ExecuteAwarePlugin}.
* *
* @return {@link ExecuteAwarePlugin} * @return {@link ExecuteAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() { public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
return mainLock.applyWithReadLock(() -> executeAwarePluginList); return mainLock.applyWithReadLock(executeAwarePluginList::getPlugins);
} }
/** /**
* Get rejected plugin list. * Get all enabled {@link RejectedAwarePlugin}.
* *
* @return {@link RejectedAwarePlugin} * @return {@link RejectedAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances, * @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager * otherwise, unexpected results may be obtained through the manager
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() { public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
return mainLock.applyWithReadLock(() -> rejectedAwarePluginList); return mainLock.applyWithReadLock(rejectedAwarePluginList::getPlugins);
} }
/** /**
* Get shutdown plugin list. * Get all enabled {@link ShutdownAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link ShutdownAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances, * @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager * otherwise, unexpected results may be obtained through the manager
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() { public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
return mainLock.applyWithReadLock(() -> shutdownAwarePluginList); return mainLock.applyWithReadLock(shutdownAwarePluginList::getPlugins);
} }
/** /**
* Get shutdown plugin list. * Get all enabled {@link TaskAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link TaskAwarePlugin}
* @apiNote Be sure to avoid directly modifying returned collection instances, * @apiNote Be sure to avoid directly modifying returned collection instances,
* otherwise, unexpected results may be obtained through the manager * otherwise, unexpected results may be obtained through the manager
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() { public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return mainLock.applyWithReadLock(() -> taskAwarePluginList); return mainLock.applyWithReadLock(taskAwarePluginList::getPlugins);
}
/**
* Whether sorting plugins is allowed.
*
* @return true if sorting plugins is allowed, false otherwise
*/
public boolean isEnableSort() {
return Objects.nonNull(pluginComparator);
} }
/** /**
@ -303,27 +386,94 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* If {@link #isEnableSort} returns false and {@code enableSort} is true, * If {@link #isEnableSort} returns false and {@code enableSort} is true,
* All currently registered plug-ins will be reordered immediately. * All currently registered plug-ins will be reordered immediately.
* *
* @param enableSort enable sort * @param comparator comparator of plugins
* @return {@link DefaultThreadPoolPluginManager} * @return {@link DefaultThreadPoolPluginManager}
* @see AnnotationAwareOrderComparator#sort(List)
*/ */
public DefaultThreadPoolPluginManager setEnableSort(boolean enableSort) { public DefaultThreadPoolPluginManager setPluginComparator(@NonNull Comparator<Object> comparator) {
// if it was unordered before, it needs to be reordered now mainLock.runWithWriteLock(() -> {
if (!isEnableSort() && enableSort) { // the specified comparator has been set
mainLock.runWithWriteLock(() -> { if (Objects.equals(this.pluginComparator, comparator)) {
// if it has been successfully updated, there is no need to operate again return;
if (this.enableSort != enableSort) { }
AnnotationAwareOrderComparator.sort(taskAwarePluginList); this.pluginComparator = comparator;
AnnotationAwareOrderComparator.sort(executeAwarePluginList); forQuickIndexes(QuickIndex::sort);
AnnotationAwareOrderComparator.sort(rejectedAwarePluginList); });
AnnotationAwareOrderComparator.sort(shutdownAwarePluginList);
}
});
}
this.enableSort = enableSort;
return this; return this;
} }
/**
* operate for each indexes
*/
private void forQuickIndexes(Consumer<QuickIndex<? extends ThreadPoolPlugin>> consumer) {
consumer.accept(taskAwarePluginList);
consumer.accept(executeAwarePluginList);
consumer.accept(rejectedAwarePluginList);
consumer.accept(shutdownAwarePluginList);
}
/**
* Quick index of registered {@link ThreadPoolPlugin}
*
* @param <T> plugin type
*/
@RequiredArgsConstructor
private class QuickIndex<T extends ThreadPoolPlugin> {
/**
* Plugin type
*/
private final Class<T> pluginType;
/**
* Plugins
*/
@Getter
private final List<T> plugins = new CopyOnWriteArrayList<>();
/**
* Add plugin if possible.
*
* @param plugin plugin
*/
public void addIfPossible(ThreadPoolPlugin plugin) {
if (!pluginType.isInstance(plugin)) {
return;
}
plugins.add(pluginType.cast(plugin));
sort();
}
/**
* Remove plugin if possible.
*
* @param plugin plugin
*/
public void removeIfPossible(ThreadPoolPlugin plugin) {
if (!pluginType.isInstance(plugin)) {
return;
}
plugins.remove(pluginType.cast(plugin));
sort();
}
/**
* Sort by {@link #pluginComparator}.
*/
public void sort() {
if (isEnableSort()) {
plugins.sort(pluginComparator);
}
}
/**
* Clear all.
*/
public void clear() {
plugins.clear();
}
}
/** /**
* Read write lock support. * Read write lock support.
*/ */
@ -381,4 +531,5 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
} }
} }
} }

@ -17,13 +17,18 @@
package cn.hippo4j.core.plugin.manager; package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
/** /**
* Empty thread pool plugin manager. * Empty thread pool plugin manager.
@ -96,6 +101,49 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
public void unregister(String pluginId) { public void unregister(String pluginId) {
} }
/**
* Get id of disabled plugins.
*
* @return id of disabled plugins
*/
@Override
public Set<String> getAllDisabledPluginIds() {
return Collections.emptySet();
}
/**
* Whether the plugin has been disabled.
*
* @param pluginId plugin id
* @return true if plugin has been disabled, false otherwise
*/
@Override
public boolean isDisabled(String pluginId) {
return true;
}
/**
* Enable plugin, make plugins will allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or enabled, false otherwise
*/
@Override
public boolean enable(String pluginId) {
return false;
}
/**
* Disable plugin, make plugins will not allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or disabled, false otherwise
*/
@Override
public boolean disable(String pluginId) {
return false;
}
/** /**
* Get {@link ThreadPoolPlugin}. * Get {@link ThreadPoolPlugin}.
* *
@ -109,9 +157,11 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
} }
/** /**
* Get execute aware plugin list. * Get all enabled {@link ExecuteAwarePlugin}.
* *
* @return {@link ExecuteAwarePlugin} * @return {@link ExecuteAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() { public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
@ -119,9 +169,11 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
} }
/** /**
* Get rejected aware plugin list. * Get all enabled {@link RejectedAwarePlugin}.
* *
* @return {@link RejectedAwarePlugin} * @return {@link RejectedAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() { public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
@ -129,9 +181,11 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
} }
/** /**
* Get shutdown aware plugin list. * Get all enabled {@link ShutdownAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link ShutdownAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() { public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
@ -139,9 +193,11 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
} }
/** /**
* Get shutdown aware plugin list. * Get all enabled {@link TaskAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link TaskAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() { public Collection<TaskAwarePlugin> getTaskAwarePluginList() {

@ -17,14 +17,20 @@
package cn.hippo4j.core.plugin.manager; package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Manager of {@link ThreadPoolPlugin}. * <p>Manager of {@link ThreadPoolPlugin}. <br />
* Bind with the specified thread-pool instance to register and manage plugins. * Bind with the specified thread-pool instance to register and manage plugins.
* when the thread pool is destroyed, please ensure that the manager will also be destroyed. * when the thread pool is destroyed, please ensure that the manager will also be destroyed.
* *
@ -87,7 +93,38 @@ public interface ThreadPoolPluginManager {
void unregister(String pluginId); void unregister(String pluginId);
/** /**
* Get {@link ThreadPoolPlugin}. * Get id of disabled plugins.
*
* @return id of disabled plugins
*/
Set<String> getAllDisabledPluginIds();
/**
* Whether the plugin has been disabled.
*
* @param pluginId plugin id
* @return true if plugin has been disabled, false otherwise
*/
boolean isDisabled(String pluginId);
/**
* Enable plugin, make plugins will allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or enabled, false otherwise
*/
boolean enable(String pluginId);
/**
* Disable plugin, make plugins will not allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or disabled, false otherwise
*/
boolean disable(String pluginId);
/**
* Get registered {@link ThreadPoolPlugin}.
* *
* @param pluginId plugin id * @param pluginId plugin id
* @param <A> target aware type * @param <A> target aware type
@ -97,30 +134,38 @@ public interface ThreadPoolPluginManager {
<A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId); <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId);
/** /**
* Get execute aware plugin list. * Get all enabled {@link ExecuteAwarePlugin}.
* *
* @return {@link ExecuteAwarePlugin} * @return {@link ExecuteAwarePlugin}
* @see #enable
* @see #disable
*/ */
Collection<ExecuteAwarePlugin> getExecuteAwarePluginList(); Collection<ExecuteAwarePlugin> getExecuteAwarePluginList();
/** /**
* Get rejected aware plugin list. * Get all enabled {@link RejectedAwarePlugin}.
* *
* @return {@link RejectedAwarePlugin} * @return {@link RejectedAwarePlugin}
* @see #enable
* @see #disable
*/ */
Collection<RejectedAwarePlugin> getRejectedAwarePluginList(); Collection<RejectedAwarePlugin> getRejectedAwarePluginList();
/** /**
* Get shutdown aware plugin list. * Get all enabled {@link ShutdownAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link ShutdownAwarePlugin}
* @see #enable
* @see #disable
*/ */
Collection<ShutdownAwarePlugin> getShutdownAwarePluginList(); Collection<ShutdownAwarePlugin> getShutdownAwarePluginList();
/** /**
* Get shutdown aware plugin list. * Get all enabled {@link TaskAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link TaskAwarePlugin}
* @see #enable
* @see #disable
*/ */
Collection<TaskAwarePlugin> getTaskAwarePluginList(); Collection<TaskAwarePlugin> getTaskAwarePluginList();

@ -17,11 +17,16 @@
package cn.hippo4j.core.plugin.manager; package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
@ -126,9 +131,54 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
} }
/** /**
* Get execute aware list. * Get id of disabled plugins.
*
* @return id of disabled plugins
*/
@Override
default Set<String> getAllDisabledPluginIds() {
return getThreadPoolPluginManager().getAllDisabledPluginIds();
}
/**
* Whether the plugin has been disabled.
*
* @param pluginId plugin id
* @return true if plugin has been disabled, false otherwise
*/
@Override
default boolean isDisabled(String pluginId) {
return getThreadPoolPluginManager().isDisabled(pluginId);
}
/**
* Enable plugin, make plugins will allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or enabled, false otherwise
*/
@Override
default boolean enable(String pluginId) {
return getThreadPoolPluginManager().enable(pluginId);
}
/**
* Disable plugin, make plugins will not allow access through quick indexes.
*
* @param pluginId plugin id
* @return true if plugin already registered or disabled, false otherwise
*/
@Override
default boolean disable(String pluginId) {
return getThreadPoolPluginManager().disable(pluginId);
}
/**
* Get all enabled {@link ExecuteAwarePlugin}.
* *
* @return {@link ExecuteAwarePlugin} * @return {@link ExecuteAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
default Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() { default Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
@ -136,9 +186,11 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
} }
/** /**
* Get rejected aware list. * Get all enabled {@link RejectedAwarePlugin}.
* *
* @return {@link RejectedAwarePlugin} * @return {@link RejectedAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
default Collection<RejectedAwarePlugin> getRejectedAwarePluginList() { default Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
@ -146,9 +198,11 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
} }
/** /**
* Get shutdown aware list. * Get all enabled {@link ShutdownAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link ShutdownAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
default Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() { default Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
@ -156,9 +210,11 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
} }
/** /**
* Get shutdown aware list. * Get all enabled {@link TaskAwarePlugin}.
* *
* @return {@link ShutdownAwarePlugin} * @return {@link TaskAwarePlugin}
* @see #enable
* @see #disable
*/ */
@Override @Override
default Collection<TaskAwarePlugin> getTaskAwarePluginList() { default Collection<TaskAwarePlugin> getTaskAwarePluginList() {

@ -26,9 +26,11 @@ import lombok.Getter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* test for {@link DefaultThreadPoolPluginManager} * test for {@link DefaultThreadPoolPluginManager}
@ -39,7 +41,7 @@ public class DefaultThreadPoolPluginManagerTest {
@Before @Before
public void initRegistry() { public void initRegistry() {
manager = new DefaultThreadPoolPluginManager(); manager = new DefaultThreadPoolPluginManager(new ReentrantReadWriteLock(), null);
} }
@Test @Test
@ -156,12 +158,58 @@ public class DefaultThreadPoolPluginManagerTest {
} }
@Test @Test
public void testSetEnableSort() { public void testEnable() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertFalse(manager.enable(plugin.getId()));
manager.register(plugin);
Assert.assertFalse(manager.enable(plugin.getId()));
manager.disable(plugin.getId());
Assert.assertTrue(manager.enable(plugin.getId()));
}
@Test
public void testDisable() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertFalse(manager.disable(plugin.getId()));
manager.register(plugin);
Assert.assertTrue(manager.disable(plugin.getId()));
Assert.assertFalse(manager.disable(plugin.getId()));
Assert.assertTrue(manager.getExecuteAwarePluginList().isEmpty());
Assert.assertEquals(1, manager.getAllPlugins().size());
}
@Test
public void testIsDisable() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertFalse(manager.isDisabled(plugin.getId()));
manager.register(plugin);
Assert.assertTrue(manager.disable(plugin.getId()));
Assert.assertTrue(manager.isDisabled(plugin.getId()));
}
@Test
public void testGetDisabledPluginIds() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertTrue(manager.getAllDisabledPluginIds().isEmpty());
manager.register(plugin);
Assert.assertTrue(manager.disable(plugin.getId()));
Assert.assertEquals(1, manager.getAllDisabledPluginIds().size());
}
@Test
public void testSetPluginComparator() {
Assert.assertFalse(manager.isEnableSort());
manager.register(new TestExecuteAwarePlugin()); manager.register(new TestExecuteAwarePlugin());
manager.register(new TestTaskAwarePlugin()); manager.register(new TestTaskAwarePlugin());
manager.setEnableSort(true); manager.setPluginComparator(AnnotationAwareOrderComparator.INSTANCE);
manager.register(new TestRejectedAwarePlugin()); manager.register(new TestRejectedAwarePlugin());
manager.register(new TestShutdownAwarePlugin()); manager.register(new TestShutdownAwarePlugin());
Assert.assertTrue(manager.isEnableSort());
Iterator<ThreadPoolPlugin> iterator = manager.getAllPlugins().iterator(); Iterator<ThreadPoolPlugin> iterator = manager.getAllPlugins().iterator();
Assert.assertEquals(TestTaskAwarePlugin.class, iterator.next().getClass()); Assert.assertEquals(TestTaskAwarePlugin.class, iterator.next().getClass());

@ -97,6 +97,49 @@ public class EmptyThreadPoolPluginManagerTest {
Assert.assertEquals(Collections.emptyList(), manager.getExecuteAwarePluginList()); Assert.assertEquals(Collections.emptyList(), manager.getExecuteAwarePluginList());
} }
@Test
public void testEnable() {
ThreadPoolPlugin plugin = new TestPlugin();
Assert.assertFalse(manager.enable(plugin.getId()));
manager.register(plugin);
Assert.assertFalse(manager.enable(plugin.getId()));
manager.disable(plugin.getId());
Assert.assertFalse(manager.enable(plugin.getId()));
}
@Test
public void testDisable() {
ThreadPoolPlugin plugin = new TestPlugin();
Assert.assertFalse(manager.disable(plugin.getId()));
manager.register(plugin);
Assert.assertFalse(manager.disable(plugin.getId()));
Assert.assertFalse(manager.disable(plugin.getId()));
Assert.assertTrue(manager.getExecuteAwarePluginList().isEmpty());
Assert.assertTrue(manager.getAllPlugins().isEmpty());
}
@Test
public void testIsDisable() {
ThreadPoolPlugin plugin = new TestPlugin();
Assert.assertTrue(manager.isDisabled(plugin.getId()));
manager.register(plugin);
Assert.assertFalse(manager.disable(plugin.getId()));
Assert.assertTrue(manager.isDisabled(plugin.getId()));
}
@Test
public void testGetDisabledPluginIds() {
ThreadPoolPlugin plugin = new TestPlugin();
Assert.assertTrue(manager.getAllDisabledPluginIds().isEmpty());
manager.register(plugin);
Assert.assertFalse(manager.disable(plugin.getId()));
Assert.assertTrue(manager.getAllDisabledPluginIds().isEmpty());
}
private static boolean isEmpty(ThreadPoolPluginManager manager) { private static boolean isEmpty(ThreadPoolPluginManager manager) {
return manager.getAllPlugins().isEmpty(); return manager.getAllPlugins().isEmpty();
} }

@ -18,7 +18,11 @@
package cn.hippo4j.core.plugin.manager; package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.*; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.junit.Assert; import org.junit.Assert;
@ -162,6 +166,49 @@ public class ThreadPoolPluginSupportTest {
Assert.assertFalse(support.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent()); Assert.assertFalse(support.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent());
} }
@Test
public void testEnable() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertFalse(support.enable(plugin.getId()));
support.register(plugin);
Assert.assertFalse(support.enable(plugin.getId()));
support.disable(plugin.getId());
Assert.assertTrue(support.enable(plugin.getId()));
}
@Test
public void testDisable() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertFalse(support.disable(plugin.getId()));
support.register(plugin);
Assert.assertTrue(support.disable(plugin.getId()));
Assert.assertFalse(support.disable(plugin.getId()));
Assert.assertTrue(support.getExecuteAwarePluginList().isEmpty());
Assert.assertEquals(1, support.getAllPlugins().size());
}
@Test
public void testIsDisable() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertFalse(support.isDisabled(plugin.getId()));
support.register(plugin);
Assert.assertTrue(support.disable(plugin.getId()));
Assert.assertTrue(support.isDisabled(plugin.getId()));
}
@Test
public void testGetDisabledPluginIds() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
Assert.assertTrue(support.getAllDisabledPluginIds().isEmpty());
support.register(plugin);
Assert.assertTrue(support.disable(plugin.getId()));
Assert.assertEquals(1, support.getAllDisabledPluginIds().size());
}
@Getter @Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin { private final static class TestTaskAwarePlugin implements TaskAwarePlugin {

Loading…
Cancel
Save