feat: Support getting plugins from the manager in a specific order (#962)

pull/968/head
黄成兴 3 years ago committed by GitHub
parent 90220fa454
commit 8175f92f9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -88,7 +88,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
@NonNull ThreadFactory threadFactory, @NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) { @NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super( super(
threadPoolId, new DefaultThreadPoolPluginManager(), threadPoolId, new DefaultThreadPoolPluginManager().setEnableSort(true),
corePoolSize, maximumPoolSize, keepAliveTime, unit, corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler); blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId); log.info("Initializing ExecutorService {}", threadPoolId);

@ -18,8 +18,15 @@
package cn.hippo4j.core.plugin.manager; package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.*; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -30,6 +37,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/** /**
* <p>The default implementation of {@link ThreadPoolPluginManager}. * <p>The default implementation of {@link ThreadPoolPluginManager}.
@ -40,6 +49,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport} * or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport}
* to support its plugin based extension functions. * to support its plugin based extension functions.
* *
* <p>When {@link #isEnableSort()} is true, plugins can be obtained in batches
* in the order specified by {@link AnnotationAwareOrderComparator}.<br />
* When the sorting function is enabled through {@link #setEnableSort} for the first time,
* all registered plugins will be sorted,
* Later, whenever a new plug-in is registered, all plug-ins will be reordered again.
*
* <p><b>NOTE:</b> * <p><b>NOTE:</b>
* When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable. * When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable.
* This means that until actually start iterating over the list, * This means that until actually start iterating over the list,
@ -48,13 +63,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* *
* @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor * @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor
* @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor * @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor
* @see AnnotationAwareOrderComparator
*/ */
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager { public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/** /**
* Lock of this instance * Lock of this instance
*/ */
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock(); private final ReadWriteLockSupport mainLock = new ReadWriteLockSupport(new ReentrantReadWriteLock());
/** /**
* Registered {@link ThreadPoolPlugin} * Registered {@link ThreadPoolPlugin}
@ -81,14 +97,18 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new CopyOnWriteArrayList<>(); private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Enable sort.
*/
@Getter
private boolean enableSort = false;
/** /**
* Clear all. * Clear all.
*/ */
@Override @Override
public synchronized void clear() { public synchronized void clear() {
Lock writeLock = instanceLock.writeLock(); mainLock.runWithWriteLock(() -> {
writeLock.lock();
try {
Collection<ThreadPoolPlugin> plugins = registeredPlugins.values(); Collection<ThreadPoolPlugin> plugins = registeredPlugins.values();
registeredPlugins.clear(); registeredPlugins.clear();
taskAwarePluginList.clear(); taskAwarePluginList.clear();
@ -96,9 +116,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
rejectedAwarePluginList.clear(); rejectedAwarePluginList.clear();
shutdownAwarePluginList.clear(); shutdownAwarePluginList.clear();
plugins.forEach(ThreadPoolPlugin::stop); plugins.forEach(ThreadPoolPlugin::stop);
} finally { });
writeLock.unlock();
}
} }
/** /**
@ -107,31 +125,41 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* @param plugin plugin * @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
* @see ThreadPoolPlugin#getId() * @see ThreadPoolPlugin#getId()
* @see #isEnableSort
* @see AnnotationAwareOrderComparator#sort(List)
*/ */
@Override @Override
public void register(@NonNull ThreadPoolPlugin plugin) { public void register(@NonNull ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock(); mainLock.runWithWriteLock(() -> {
writeLock.lock();
try {
String id = plugin.getId(); String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered"); Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered");
registeredPlugins.put(id, plugin); registeredPlugins.put(id, plugin);
if (plugin instanceof TaskAwarePlugin) { if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin) plugin); taskAwarePluginList.add((TaskAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(taskAwarePluginList);
}
} }
if (plugin instanceof ExecuteAwarePlugin) { if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.add((ExecuteAwarePlugin) plugin); executeAwarePluginList.add((ExecuteAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(executeAwarePluginList);
}
} }
if (plugin instanceof RejectedAwarePlugin) { if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.add((RejectedAwarePlugin) plugin); rejectedAwarePluginList.add((RejectedAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(rejectedAwarePluginList);
}
} }
if (plugin instanceof ShutdownAwarePlugin) { if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin); shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin);
if (enableSort) {
AnnotationAwareOrderComparator.sort(shutdownAwarePluginList);
} }
plugin.start();
} finally {
writeLock.unlock();
} }
plugin.start();
});
} }
/** /**
@ -142,17 +170,13 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public boolean tryRegister(ThreadPoolPlugin plugin) { public boolean tryRegister(ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock(); return mainLock.applyWithWriteLock(() -> {
writeLock.lock();
try {
if (registeredPlugins.containsKey(plugin.getId())) { if (registeredPlugins.containsKey(plugin.getId())) {
return false; return false;
} }
register(plugin); register(plugin);
return true; return true;
} finally { });
writeLock.unlock();
}
} }
/** /**
@ -162,10 +186,8 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public void unregister(String pluginId) { public void unregister(String pluginId) {
Lock writeLock = instanceLock.writeLock(); mainLock.runWithWriteLock(
writeLock.lock(); () -> Optional.ofNullable(pluginId)
try {
Optional.ofNullable(pluginId)
.map(registeredPlugins::remove) .map(registeredPlugins::remove)
.ifPresent(plugin -> { .ifPresent(plugin -> {
if (plugin instanceof TaskAwarePlugin) { if (plugin instanceof TaskAwarePlugin) {
@ -181,10 +203,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
shutdownAwarePluginList.remove(plugin); shutdownAwarePluginList.remove(plugin);
} }
plugin.stop(); plugin.stop();
}); }));
} finally {
writeLock.unlock();
}
} }
/** /**
@ -196,13 +215,15 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public Collection<ThreadPoolPlugin> getAllPlugins() { public Collection<ThreadPoolPlugin> getAllPlugins() {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(() -> {
readLock.lock(); // sort if necessary
try { if (enableSort) {
return registeredPlugins.values(); return registeredPlugins.values().stream()
} finally { .sorted(AnnotationAwareOrderComparator.INSTANCE)
readLock.unlock(); .collect(Collectors.toList());
} }
return registeredPlugins.values();
});
} }
/** /**
@ -213,13 +234,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public boolean isRegistered(String pluginId) { public boolean isRegistered(String pluginId) {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(() -> registeredPlugins.containsKey(pluginId));
readLock.lock();
try {
return registeredPlugins.containsKey(pluginId);
} finally {
readLock.unlock();
}
} }
/** /**
@ -232,13 +247,8 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) { public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(
readLock.lock(); () -> (Optional<A>) Optional.ofNullable(registeredPlugins.get(pluginId)));
try {
return (Optional<A>) Optional.ofNullable(registeredPlugins.get(pluginId));
} finally {
readLock.unlock();
}
} }
/** /**
@ -248,13 +258,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() { public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(() -> executeAwarePluginList);
readLock.lock();
try {
return executeAwarePluginList;
} finally {
readLock.unlock();
}
} }
/** /**
@ -266,13 +270,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() { public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(() -> rejectedAwarePluginList);
readLock.lock();
try {
return rejectedAwarePluginList;
} finally {
readLock.unlock();
}
} }
/** /**
@ -284,13 +282,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() { public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(() -> shutdownAwarePluginList);
readLock.lock();
try {
return shutdownAwarePluginList;
} finally {
readLock.unlock();
}
} }
/** /**
@ -302,12 +294,91 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/ */
@Override @Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() { public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
Lock readLock = instanceLock.readLock(); return mainLock.applyWithReadLock(() -> taskAwarePluginList);
}
/**
* <p>Set whether sorting is allowed. <br />
* <b>NOTE</b>:
* If {@link #isEnableSort} returns false and {@code enableSort} is true,
* All currently registered plug-ins will be reordered immediately.
*
* @param enableSort enable sort
* @return {@link DefaultThreadPoolPluginManager}
* @see AnnotationAwareOrderComparator#sort(List)
*/
public DefaultThreadPoolPluginManager setEnableSort(boolean enableSort) {
// if it was unordered before, it needs to be reordered now
if (!isEnableSort() && enableSort) {
mainLock.runWithWriteLock(() -> {
// if it has been successfully updated, there is no need to operate again
if (this.enableSort != enableSort) {
AnnotationAwareOrderComparator.sort(taskAwarePluginList);
AnnotationAwareOrderComparator.sort(executeAwarePluginList);
AnnotationAwareOrderComparator.sort(rejectedAwarePluginList);
AnnotationAwareOrderComparator.sort(shutdownAwarePluginList);
}
});
}
this.enableSort = enableSort;
return this;
}
/**
* Read write lock support.
*/
@RequiredArgsConstructor
private static class ReadWriteLockSupport {
/**
* lock
*/
private final ReadWriteLock lock;
/**
* Get the read-lock and do something.
*
* @param supplier supplier
*/
public <T> T applyWithReadLock(Supplier<T> supplier) {
Lock readLock = lock.readLock();
readLock.lock(); readLock.lock();
try { try {
return taskAwarePluginList; return supplier.get();
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
/**
* Get the write-lock and do something.
*
* @param runnable runnable
*/
public void runWithWriteLock(Runnable runnable) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
runnable.run();
} finally {
writeLock.unlock();
}
}
/**
* Get the write-lock and do something.
*
* @param supplier supplier
*/
public <T> T applyWithWriteLock(Supplier<T> supplier) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
return supplier.get();
} finally {
writeLock.unlock();
}
}
}
} }

@ -17,11 +17,18 @@
package cn.hippo4j.core.plugin.manager; package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter; import lombok.Getter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.core.annotation.Order;
import java.util.Iterator;
/** /**
* test for {@link DefaultThreadPoolPluginManager} * test for {@link DefaultThreadPoolPluginManager}
@ -70,6 +77,18 @@ public class DefaultThreadPoolPluginManagerTest {
@Test @Test
public void testUnregister() { public void testUnregister() {
manager.register(new TestTaskAwarePlugin());
manager.unregister(TestTaskAwarePlugin.class.getSimpleName());
Assert.assertFalse(manager.isRegistered(TestTaskAwarePlugin.class.getSimpleName()));
manager.register(new TestRejectedAwarePlugin());
manager.unregister(TestRejectedAwarePlugin.class.getSimpleName());
Assert.assertFalse(manager.isRegistered(TestRejectedAwarePlugin.class.getSimpleName()));
manager.register(new TestShutdownAwarePlugin());
manager.unregister(TestShutdownAwarePlugin.class.getSimpleName());
Assert.assertFalse(manager.isRegistered(TestShutdownAwarePlugin.class.getSimpleName()));
manager.register(new TestExecuteAwarePlugin()); manager.register(new TestExecuteAwarePlugin());
manager.unregister(TestExecuteAwarePlugin.class.getSimpleName()); manager.unregister(TestExecuteAwarePlugin.class.getSimpleName());
Assert.assertFalse(manager.isRegistered(TestExecuteAwarePlugin.class.getSimpleName())); Assert.assertFalse(manager.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
@ -136,24 +155,43 @@ public class DefaultThreadPoolPluginManagerTest {
Assert.assertFalse(manager.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent()); Assert.assertFalse(manager.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent());
} }
@Test
public void testSetEnableSort() {
manager.register(new TestExecuteAwarePlugin());
manager.register(new TestTaskAwarePlugin());
manager.setEnableSort(true);
manager.register(new TestRejectedAwarePlugin());
manager.register(new TestShutdownAwarePlugin());
Iterator<ThreadPoolPlugin> iterator = manager.getAllPlugins().iterator();
Assert.assertEquals(TestTaskAwarePlugin.class, iterator.next().getClass());
Assert.assertEquals(TestRejectedAwarePlugin.class, iterator.next().getClass());
Assert.assertEquals(TestExecuteAwarePlugin.class, iterator.next().getClass());
Assert.assertEquals(TestShutdownAwarePlugin.class, iterator.next().getClass());
}
@Order(0)
@Getter @Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin { private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final String id = this.getClass().getSimpleName(); private final String id = this.getClass().getSimpleName();
} }
@Order(2)
@Getter @Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin { private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final String id = this.getClass().getSimpleName(); private final String id = this.getClass().getSimpleName();
} }
@Order(1)
@Getter @Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin { private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final String id = this.getClass().getSimpleName(); private final String id = this.getClass().getSimpleName();
} }
@Order(3)
@Getter @Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {

Loading…
Cancel
Save