From 8175f92f9cbc248a0ff13590fbc2e65615fb22c2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=BB=84=E6=88=90=E5=85=B4?=
<49221670+Createsequence@users.noreply.github.com>
Date: Wed, 9 Nov 2022 18:45:00 +0800
Subject: [PATCH] feat: Support getting plugins from the manager in a specific
order (#962)
---
.../executor/DynamicThreadPoolExecutor.java | 2 +-
.../DefaultThreadPoolPluginManager.java | 251 +++++++++++-------
.../DefaultThreadPoolPluginManagerTest.java | 40 ++-
3 files changed, 201 insertions(+), 92 deletions(-)
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
index 6790a374..a107301d 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
@@ -88,7 +88,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(
- threadPoolId, new DefaultThreadPoolPluginManager(),
+ threadPoolId, new DefaultThreadPoolPluginManager().setEnableSort(true),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId);
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java
index 039dc593..f2f6eebb 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java
@@ -18,8 +18,15 @@
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.common.toolkit.Assert;
-import cn.hippo4j.core.plugin.*;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import cn.hippo4j.core.plugin.RejectedAwarePlugin;
+import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
+import cn.hippo4j.core.plugin.TaskAwarePlugin;
+import cn.hippo4j.core.plugin.ThreadPoolPlugin;
+import lombok.Getter;
import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import java.util.Collection;
import java.util.List;
@@ -30,6 +37,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
/**
*
The default implementation of {@link ThreadPoolPluginManager}.
@@ -40,6 +49,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* or bound to an {@link java.util.concurrent.ThreadPoolExecutor} instance through {@link ThreadPoolPluginSupport}
* to support its plugin based extension functions.
*
+ *
When {@link #isEnableSort()} is true, plugins can be obtained in batches
+ * in the order specified by {@link AnnotationAwareOrderComparator}.
+ * When the sorting function is enabled through {@link #setEnableSort} for the first time,
+ * all registered plugins will be sorted,
+ * Later, whenever a new plug-in is registered, all plug-ins will be reordered again.
+ *
*
NOTE:
* When the list of plugins is obtained through the {@code getXXX} method of manager, the list is not immutable.
* This means that until actually start iterating over the list,
@@ -48,13 +63,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* @see cn.hippo4j.core.executor.DynamicThreadPoolExecutor
* @see cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor
+ * @see AnnotationAwareOrderComparator
*/
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* Lock of this instance
*/
- private final ReadWriteLock instanceLock = new ReentrantReadWriteLock();
+ private final ReadWriteLockSupport mainLock = new ReadWriteLockSupport(new ReentrantReadWriteLock());
/**
* Registered {@link ThreadPoolPlugin}
@@ -81,14 +97,18 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
private final List shutdownAwarePluginList = new CopyOnWriteArrayList<>();
+ /**
+ * Enable sort.
+ */
+ @Getter
+ private boolean enableSort = false;
+
/**
* Clear all.
*/
@Override
public synchronized void clear() {
- Lock writeLock = instanceLock.writeLock();
- writeLock.lock();
- try {
+ mainLock.runWithWriteLock(() -> {
Collection plugins = registeredPlugins.values();
registeredPlugins.clear();
taskAwarePluginList.clear();
@@ -96,9 +116,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
rejectedAwarePluginList.clear();
shutdownAwarePluginList.clear();
plugins.forEach(ThreadPoolPlugin::stop);
- } finally {
- writeLock.unlock();
- }
+ });
}
/**
@@ -107,31 +125,41 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
* @see ThreadPoolPlugin#getId()
+ * @see #isEnableSort
+ * @see AnnotationAwareOrderComparator#sort(List)
*/
@Override
public void register(@NonNull ThreadPoolPlugin plugin) {
- Lock writeLock = instanceLock.writeLock();
- writeLock.lock();
- try {
+ mainLock.runWithWriteLock(() -> {
String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered");
registeredPlugins.put(id, plugin);
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin) plugin);
+ if (enableSort) {
+ AnnotationAwareOrderComparator.sort(taskAwarePluginList);
+ }
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.add((ExecuteAwarePlugin) plugin);
+ if (enableSort) {
+ AnnotationAwareOrderComparator.sort(executeAwarePluginList);
+ }
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.add((RejectedAwarePlugin) plugin);
+ if (enableSort) {
+ AnnotationAwareOrderComparator.sort(rejectedAwarePluginList);
+ }
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin);
+ if (enableSort) {
+ AnnotationAwareOrderComparator.sort(shutdownAwarePluginList);
+ }
}
plugin.start();
- } finally {
- writeLock.unlock();
- }
+ });
}
/**
@@ -142,17 +170,13 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public boolean tryRegister(ThreadPoolPlugin plugin) {
- Lock writeLock = instanceLock.writeLock();
- writeLock.lock();
- try {
+ return mainLock.applyWithWriteLock(() -> {
if (registeredPlugins.containsKey(plugin.getId())) {
return false;
}
register(plugin);
return true;
- } finally {
- writeLock.unlock();
- }
+ });
}
/**
@@ -162,29 +186,24 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public void unregister(String pluginId) {
- Lock writeLock = instanceLock.writeLock();
- writeLock.lock();
- try {
- Optional.ofNullable(pluginId)
- .map(registeredPlugins::remove)
- .ifPresent(plugin -> {
- if (plugin instanceof TaskAwarePlugin) {
- taskAwarePluginList.remove(plugin);
- }
- if (plugin instanceof ExecuteAwarePlugin) {
- executeAwarePluginList.remove(plugin);
- }
- if (plugin instanceof RejectedAwarePlugin) {
- rejectedAwarePluginList.remove(plugin);
- }
- if (plugin instanceof ShutdownAwarePlugin) {
- shutdownAwarePluginList.remove(plugin);
- }
- plugin.stop();
- });
- } finally {
- writeLock.unlock();
- }
+ mainLock.runWithWriteLock(
+ () -> Optional.ofNullable(pluginId)
+ .map(registeredPlugins::remove)
+ .ifPresent(plugin -> {
+ if (plugin instanceof TaskAwarePlugin) {
+ taskAwarePluginList.remove(plugin);
+ }
+ if (plugin instanceof ExecuteAwarePlugin) {
+ executeAwarePluginList.remove(plugin);
+ }
+ if (plugin instanceof RejectedAwarePlugin) {
+ rejectedAwarePluginList.remove(plugin);
+ }
+ if (plugin instanceof ShutdownAwarePlugin) {
+ shutdownAwarePluginList.remove(plugin);
+ }
+ plugin.stop();
+ }));
}
/**
@@ -196,13 +215,15 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public Collection getAllPlugins() {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
+ return mainLock.applyWithReadLock(() -> {
+ // sort if necessary
+ if (enableSort) {
+ return registeredPlugins.values().stream()
+ .sorted(AnnotationAwareOrderComparator.INSTANCE)
+ .collect(Collectors.toList());
+ }
return registeredPlugins.values();
- } finally {
- readLock.unlock();
- }
+ });
}
/**
@@ -213,13 +234,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public boolean isRegistered(String pluginId) {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
- return registeredPlugins.containsKey(pluginId);
- } finally {
- readLock.unlock();
- }
+ return mainLock.applyWithReadLock(() -> registeredPlugins.containsKey(pluginId));
}
/**
@@ -232,13 +247,8 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
@Override
@SuppressWarnings("unchecked")
public Optional getPlugin(String pluginId) {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
- return (Optional) Optional.ofNullable(registeredPlugins.get(pluginId));
- } finally {
- readLock.unlock();
- }
+ return mainLock.applyWithReadLock(
+ () -> (Optional) Optional.ofNullable(registeredPlugins.get(pluginId)));
}
/**
@@ -248,13 +258,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public Collection getExecuteAwarePluginList() {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
- return executeAwarePluginList;
- } finally {
- readLock.unlock();
- }
+ return mainLock.applyWithReadLock(() -> executeAwarePluginList);
}
/**
@@ -266,13 +270,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public Collection getRejectedAwarePluginList() {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
- return rejectedAwarePluginList;
- } finally {
- readLock.unlock();
- }
+ return mainLock.applyWithReadLock(() -> rejectedAwarePluginList);
}
/**
@@ -284,13 +282,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public Collection getShutdownAwarePluginList() {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
- return shutdownAwarePluginList;
- } finally {
- readLock.unlock();
- }
+ return mainLock.applyWithReadLock(() -> shutdownAwarePluginList);
}
/**
@@ -302,12 +294,91 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public Collection getTaskAwarePluginList() {
- Lock readLock = instanceLock.readLock();
- readLock.lock();
- try {
- return taskAwarePluginList;
- } finally {
- readLock.unlock();
+ return mainLock.applyWithReadLock(() -> taskAwarePluginList);
+ }
+
+ /**
+ * Set whether sorting is allowed.
+ * NOTE:
+ * If {@link #isEnableSort} returns false and {@code enableSort} is true,
+ * All currently registered plug-ins will be reordered immediately.
+ *
+ * @param enableSort enable sort
+ * @return {@link DefaultThreadPoolPluginManager}
+ * @see AnnotationAwareOrderComparator#sort(List)
+ */
+ public DefaultThreadPoolPluginManager setEnableSort(boolean enableSort) {
+ // if it was unordered before, it needs to be reordered now
+ if (!isEnableSort() && enableSort) {
+ mainLock.runWithWriteLock(() -> {
+ // if it has been successfully updated, there is no need to operate again
+ if (this.enableSort != enableSort) {
+ AnnotationAwareOrderComparator.sort(taskAwarePluginList);
+ AnnotationAwareOrderComparator.sort(executeAwarePluginList);
+ AnnotationAwareOrderComparator.sort(rejectedAwarePluginList);
+ AnnotationAwareOrderComparator.sort(shutdownAwarePluginList);
+ }
+ });
}
+ this.enableSort = enableSort;
+ return this;
+ }
+
+ /**
+ * Read write lock support.
+ */
+ @RequiredArgsConstructor
+ private static class ReadWriteLockSupport {
+
+ /**
+ * lock
+ */
+ private final ReadWriteLock lock;
+
+ /**
+ * Get the read-lock and do something.
+ *
+ * @param supplier supplier
+ */
+ public T applyWithReadLock(Supplier supplier) {
+ Lock readLock = lock.readLock();
+ readLock.lock();
+ try {
+ return supplier.get();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get the write-lock and do something.
+ *
+ * @param runnable runnable
+ */
+ public void runWithWriteLock(Runnable runnable) {
+ Lock writeLock = lock.writeLock();
+ writeLock.lock();
+ try {
+ runnable.run();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Get the write-lock and do something.
+ *
+ * @param supplier supplier
+ */
+ public T applyWithWriteLock(Supplier supplier) {
+ Lock writeLock = lock.writeLock();
+ writeLock.lock();
+ try {
+ return supplier.get();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
}
}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManagerTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManagerTest.java
index 627e65aa..64b78b79 100644
--- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManagerTest.java
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManagerTest.java
@@ -17,11 +17,18 @@
package cn.hippo4j.core.plugin.manager;
-import cn.hippo4j.core.plugin.*;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import cn.hippo4j.core.plugin.RejectedAwarePlugin;
+import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
+import cn.hippo4j.core.plugin.TaskAwarePlugin;
+import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.core.annotation.Order;
+
+import java.util.Iterator;
/**
* test for {@link DefaultThreadPoolPluginManager}
@@ -70,6 +77,18 @@ public class DefaultThreadPoolPluginManagerTest {
@Test
public void testUnregister() {
+ manager.register(new TestTaskAwarePlugin());
+ manager.unregister(TestTaskAwarePlugin.class.getSimpleName());
+ Assert.assertFalse(manager.isRegistered(TestTaskAwarePlugin.class.getSimpleName()));
+
+ manager.register(new TestRejectedAwarePlugin());
+ manager.unregister(TestRejectedAwarePlugin.class.getSimpleName());
+ Assert.assertFalse(manager.isRegistered(TestRejectedAwarePlugin.class.getSimpleName()));
+
+ manager.register(new TestShutdownAwarePlugin());
+ manager.unregister(TestShutdownAwarePlugin.class.getSimpleName());
+ Assert.assertFalse(manager.isRegistered(TestShutdownAwarePlugin.class.getSimpleName()));
+
manager.register(new TestExecuteAwarePlugin());
manager.unregister(TestExecuteAwarePlugin.class.getSimpleName());
Assert.assertFalse(manager.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
@@ -136,24 +155,43 @@ public class DefaultThreadPoolPluginManagerTest {
Assert.assertFalse(manager.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent());
}
+ @Test
+ public void testSetEnableSort() {
+ manager.register(new TestExecuteAwarePlugin());
+ manager.register(new TestTaskAwarePlugin());
+ manager.setEnableSort(true);
+ manager.register(new TestRejectedAwarePlugin());
+ manager.register(new TestShutdownAwarePlugin());
+
+ Iterator iterator = manager.getAllPlugins().iterator();
+ Assert.assertEquals(TestTaskAwarePlugin.class, iterator.next().getClass());
+ Assert.assertEquals(TestRejectedAwarePlugin.class, iterator.next().getClass());
+ Assert.assertEquals(TestExecuteAwarePlugin.class, iterator.next().getClass());
+ Assert.assertEquals(TestShutdownAwarePlugin.class, iterator.next().getClass());
+ }
+
+ @Order(0)
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
+ @Order(2)
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
+ @Order(1)
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
+ @Order(3)
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {