@ -18,18 +18,33 @@
package cn.hippo4j.core.plugin.manager ;
import cn.hippo4j.common.toolkit.Assert ;
import cn.hippo4j.core.plugin.* ;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin ;
import cn.hippo4j.core.plugin.RejectedAwarePlugin ;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin ;
import cn.hippo4j.core.plugin.TaskAwarePlugin ;
import cn.hippo4j.core.plugin.ThreadPoolPlugin ;
import lombok.Getter ;
import lombok.NonNull ;
import lombok.RequiredArgsConstructor ;
import org.checkerframework.checker.nullness.qual.Nullable ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.Collections ;
import java.util.Comparator ;
import java.util.List ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Optional ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReadWriteLock ;
import java.util.concurrent.locks.ReentrantReadWriteLock ;
import java.util.function.Consumer ;
import java.util.function.Supplier ;
import java.util.stream.Collectors ;
/ * *
* < p > The default implementation of { @link ThreadPoolPluginManager } .
@ -40,8 +55,28 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* or bound to an { @link java . util . concurrent . ThreadPoolExecutor } instance through { @link ThreadPoolPluginSupport }
* to support its plugin based extension functions .
*
* < p > < b > NOTE : < / b >
* When the list of plugins is obtained through the { @code getXXX } method of manager , the list is not immutable .
* < h3 > Order of plugin < / h3 >
* < p > By default , plugins are installed in the order in which they are registered .
* When { @link # isEnableSort ( ) } is true , plugins can be obtained in batches
* in the order specified by { @link # pluginComparator } ( if not null ) . < br / >
* When the sorting function is enabled through { @link # setPluginComparator } for the first time ,
* all registered plugins will be sorted ,
* Later , whenever a new plug - in is registered , all plug - ins will be reordered again .
*
* < h3 > Status of the plugin < / h3 >
* < p > The plugins registered in the container can be divided into two states : < em > enabled < / em > and < em > disabled < / em > ,
* Plugins that are < em > disabled < / em > cannot be obtained through the following methods :
* < ul >
* < li > { @link # getTaskAwarePluginList } < / li >
* < li > { @link # getExecuteAwarePluginList } < / li >
* < li > { @link # getRejectedAwarePluginList } < / li >
* < li > { @link # getShutdownAwarePluginList } < / li >
* < / ul >
* Generally , plugins in disabled status will not be used by { @link ThreadPoolPluginSupport } .
* users can switch the status of plugins through { @link # enable } and { @link # disable } methods .
*
* < h3 > Thread - safe operation support < / h3 >
* < p > When the list of plugins is obtained through the { @code getXXX } method of manager , the list is not immutable .
* This means that until actually start iterating over the list ,
* registering or unregistering plugins through the manager will affect the results of the iteration .
* Therefore , we should try to ensure that < b > get the latest plugin list from the manager before each use < / b > .
@ -54,51 +89,75 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/ * *
* Lock of this instance
* /
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock ( ) ;
private final ReadWriteLock Support mainLock ;
/ * *
* R egistered { @link ThreadPoolPlugin }
* All r egistered { @link ThreadPoolPlugin }
* /
private final Map < String , ThreadPoolPlugin > registeredPlugins = new ConcurrentHashMap < > ( 16 ) ;
/ * *
* Registered { @link TaskAwarePlugin }
* Disabled plugins .
* /
private final Set < String > disabledPlugins = Collections . newSetFromMap ( new ConcurrentHashMap < > ( 16 ) ) ;
/ * *
* Index of enabled { @link TaskAwarePlugin }
* /
private final QuickIndex < TaskAwarePlugin > taskAwarePluginList = new QuickIndex < > ( TaskAwarePlugin . class ) ;
/ * *
* Index of enabled { @link ExecuteAwarePlugin }
* /
private final List < TaskAwarePlugin > taskAwarePluginList = new CopyOnWriteArrayList < > ( ) ;
private final QuickIndex< ExecuteAwarePlugin > executeAwarePluginList = new QuickIndex < > ( ExecuteAwarePlugin . class ) ;
/ * *
* Registered { @link ExecuteAwarePlugin }
* Index of enabled { @link Rejected AwarePlugin}
* /
private final List < ExecuteAwarePlugin > executeAwarePluginList = new CopyOnWriteArrayList < > ( ) ;
private final QuickIndex< RejectedAwarePlugin > rejectedAwarePluginList = new QuickIndex < > ( RejectedAwarePlugin . class ) ;
/ * *
* Registered { @link RejectedAwarePlugin }
* Index of enabled { @link Shutdown AwarePlugin}
* /
private final List < RejectedAwarePlugin > rejectedAwarePluginList = new CopyOnWriteArrayList < > ( ) ;
private final QuickIndex < ShutdownAwarePlugin > shutdownAwarePluginList = new QuickIndex < > ( ShutdownAwarePlugin . class ) ;
/ * *
* Comparator of { @link ThreadPoolPlugin } .
* /
private Comparator < Object > pluginComparator ;
/ * *
* Create a { @link DefaultThreadPoolPluginManager } ,
* By default , plugins are not sorted ,
* and thread safety is implemented based on { @link ReentrantReadWriteLock } .
* /
public DefaultThreadPoolPluginManager ( ) {
this ( new ReentrantReadWriteLock ( ) , null ) ;
}
/ * *
* Registered { @link ShutdownAwarePlugin }
* Create a { @link DefaultThreadPoolPluginManager } .
*
* @param mainLock main lock
* @param pluginComparator comparator of plugin
* /
private final List < ShutdownAwarePlugin > shutdownAwarePluginList = new CopyOnWriteArrayList < > ( ) ;
public DefaultThreadPoolPluginManager (
@NonNull ReadWriteLock mainLock , @Nullable Comparator < Object > pluginComparator ) {
this . pluginComparator = pluginComparator ;
this . mainLock = new ReadWriteLockSupport ( mainLock ) ;
}
/ * *
* Clear all .
* /
@Override
public synchronized void clear ( ) {
Lock writeLock = instanceLock . writeLock ( ) ;
writeLock . lock ( ) ;
try {
Collection < ThreadPoolPlugin > plugins = registeredPlugins . values ( ) ;
public void clear ( ) {
mainLock . runWithWriteLock ( ( ) - > {
Collection < ThreadPoolPlugin > plugins = new ArrayList < > ( registeredPlugins . values ( ) ) ;
registeredPlugins . clear ( ) ;
taskAwarePluginList . clear ( ) ;
executeAwarePluginList . clear ( ) ;
rejectedAwarePluginList . clear ( ) ;
shutdownAwarePluginList . clear ( ) ;
forQuickIndexes ( QuickIndex : : clear ) ;
plugins . forEach ( ThreadPoolPlugin : : stop ) ;
} finally {
writeLock . unlock ( ) ;
}
} ) ;
}
/ * *
@ -107,31 +166,17 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same { @link ThreadPoolPlugin # getId ( ) } already exists in the registry
* @see ThreadPoolPlugin # getId ( )
* @see # isEnableSort
* /
@Override
public void register ( @NonNull ThreadPoolPlugin plugin ) {
Lock writeLock = instanceLock . writeLock ( ) ;
writeLock . lock ( ) ;
try {
mainLock . runWithWriteLock ( ( ) - > {
String id = plugin . getId ( ) ;
Assert . isTrue ( ! isRegistered ( id ) , "The plugin with id [" + id + "] has been registered" ) ;
registeredPlugins . put ( id , plugin ) ;
if ( plugin instanceof TaskAwarePlugin ) {
taskAwarePluginList . add ( ( TaskAwarePlugin ) plugin ) ;
}
if ( plugin instanceof ExecuteAwarePlugin ) {
executeAwarePluginList . add ( ( ExecuteAwarePlugin ) plugin ) ;
}
if ( plugin instanceof RejectedAwarePlugin ) {
rejectedAwarePluginList . add ( ( RejectedAwarePlugin ) plugin ) ;
}
if ( plugin instanceof ShutdownAwarePlugin ) {
shutdownAwarePluginList . add ( ( ShutdownAwarePlugin ) plugin ) ;
}
forQuickIndexes ( quickIndex - > quickIndex . addIfPossible ( plugin ) ) ;
plugin . start ( ) ;
} finally {
writeLock . unlock ( ) ;
}
} ) ;
}
/ * *
@ -142,17 +187,13 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* /
@Override
public boolean tryRegister ( ThreadPoolPlugin plugin ) {
Lock writeLock = instanceLock . writeLock ( ) ;
writeLock . lock ( ) ;
try {
return mainLock . applyWithWriteLock ( ( ) - > {
if ( registeredPlugins . containsKey ( plugin . getId ( ) ) ) {
return false ;
}
register ( plugin ) ;
return true ;
} finally {
writeLock . unlock ( ) ;
}
} ) ;
}
/ * *
@ -162,29 +203,71 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* /
@Override
public void unregister ( String pluginId ) {
Lock writeLock = instanceLock . writeLock ( ) ;
writeLock . lock ( ) ;
try {
Optional . ofNullable ( pluginId )
. map ( registeredPlugins : : remove )
. ifPresent ( plugin - > {
if ( plugin instanceof TaskAwarePlugin ) {
taskAwarePluginList . remove ( plugin ) ;
}
if ( plugin instanceof ExecuteAwarePlugin ) {
executeAwarePluginList . remove ( plugin ) ;
}
if ( plugin instanceof RejectedAwarePlugin ) {
rejectedAwarePluginList . remove ( plugin ) ;
}
if ( plugin instanceof ShutdownAwarePlugin ) {
shutdownAwarePluginList . remove ( plugin ) ;
}
plugin . stop ( ) ;
} ) ;
} finally {
writeLock . unlock ( ) ;
}
mainLock . runWithWriteLock (
( ) - > Optional . ofNullable ( pluginId )
. map ( registeredPlugins : : remove )
. ifPresent ( plugin - > {
disabledPlugins . remove ( pluginId ) ;
forQuickIndexes ( quickIndex - > quickIndex . removeIfPossible ( plugin ) ) ;
plugin . stop ( ) ;
} ) ) ;
}
/ * *
* Get id of disabled plugins .
*
* @return id of disabled plugins
* /
@Override
public Set < String > getAllDisabledPluginIds ( ) {
return disabledPlugins ;
}
/ * *
* Whether the plugin has been disabled .
*
* @param pluginId plugin id
* @return true if plugin has been disabled , false otherwise
* /
@Override
public boolean isDisabled ( String pluginId ) {
return disabledPlugins . contains ( pluginId ) ;
}
/ * *
* Enable plugin , make plugins will allow access through quick indexes .
*
* @param pluginId plugin id
* @return true if plugin already registered or enabled , false otherwise
* /
@Override
public boolean enable ( String pluginId ) {
return mainLock . applyWithReadLock ( ( ) - > {
ThreadPoolPlugin plugin = registeredPlugins . get ( pluginId ) ;
if ( Objects . isNull ( plugin ) | | ! disabledPlugins . remove ( pluginId ) ) {
return false ;
}
forQuickIndexes ( quickIndex - > quickIndex . addIfPossible ( plugin ) ) ;
return true ;
} ) ;
}
/ * *
* Disable plugin , make plugins will not allow access through quick indexes .
*
* @param pluginId plugin id
* @return true if plugin already registered or disabled , false otherwise
* /
@Override
public boolean disable ( String pluginId ) {
return mainLock . applyWithReadLock ( ( ) - > {
ThreadPoolPlugin plugin = registeredPlugins . get ( pluginId ) ;
if ( Objects . isNull ( plugin ) | | ! disabledPlugins . add ( pluginId ) ) {
return false ;
}
forQuickIndexes ( quickIndex - > quickIndex . removeIfPossible ( plugin ) ) ;
return true ;
} ) ;
}
/ * *
@ -196,13 +279,15 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* /
@Override
public Collection < ThreadPoolPlugin > getAllPlugins ( ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return mainLock . applyWithReadLock ( ( ) - > {
// sort if necessary
if ( isEnableSort ( ) ) {
return registeredPlugins . values ( ) . stream ( )
. sorted ( pluginComparator )
. collect ( Collectors . toList ( ) ) ;
}
return registeredPlugins . values ( ) ;
} finally {
readLock . unlock ( ) ;
}
} ) ;
}
/ * *
@ -213,13 +298,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
* /
@Override
public boolean isRegistered ( String pluginId ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return registeredPlugins . containsKey ( pluginId ) ;
} finally {
readLock . unlock ( ) ;
}
return mainLock . applyWithReadLock ( ( ) - > registeredPlugins . containsKey ( pluginId ) ) ;
}
/ * *
@ -232,82 +311,225 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
@Override
@SuppressWarnings ( "unchecked" )
public < A extends ThreadPoolPlugin > Optional < A > getPlugin ( String pluginId ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return ( Optional < A > ) Optional . ofNullable ( registeredPlugins . get ( pluginId ) ) ;
} finally {
readLock . unlock ( ) ;
}
return mainLock . applyWithReadLock (
( ) - > ( Optional < A > ) Optional . ofNullable ( registeredPlugins . get ( pluginId ) ) ) ;
}
/ * *
* Get execute plugin list .
* Get all enabled { @link ExecuteAwarePlugin } .
*
* @return { @link ExecuteAwarePlugin }
* @apiNote Be sure to avoid directly modifying returned collection instances ,
* otherwise , unexpected results may be obtained through the manager
* @see # enable
* @see # disable
* /
@Override
public Collection < ExecuteAwarePlugin > getExecuteAwarePluginList ( ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return executeAwarePluginList ;
} finally {
readLock . unlock ( ) ;
}
return mainLock . applyWithReadLock ( executeAwarePluginList : : getPlugins ) ;
}
/ * *
* Get rejected plugin list .
* Get all enabled { @link RejectedAwarePlugin } .
*
* @return { @link RejectedAwarePlugin }
* @apiNote Be sure to avoid directly modifying returned collection instances ,
* otherwise , unexpected results may be obtained through the manager
* @see # enable
* @see # disable
* /
@Override
public Collection < RejectedAwarePlugin > getRejectedAwarePluginList ( ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return rejectedAwarePluginList ;
} finally {
readLock . unlock ( ) ;
}
return mainLock . applyWithReadLock ( rejectedAwarePluginList : : getPlugins ) ;
}
/ * *
* Get shutdown plugin list .
* Get all enabled { @link ShutdownAwarePlugin } .
*
* @return { @link ShutdownAwarePlugin }
* @apiNote Be sure to avoid directly modifying returned collection instances ,
* otherwise , unexpected results may be obtained through the manager
* @see # enable
* @see # disable
* /
@Override
public Collection < ShutdownAwarePlugin > getShutdownAwarePluginList ( ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return shutdownAwarePluginList ;
} finally {
readLock . unlock ( ) ;
}
return mainLock . applyWithReadLock ( shutdownAwarePluginList : : getPlugins ) ;
}
/ * *
* Get shutdown plugin list .
* Get all enabled { @link TaskAwarePlugin } .
*
* @return { @link Shutdown AwarePlugin}
* @return { @link TaskAwarePlugin }
* @apiNote Be sure to avoid directly modifying returned collection instances ,
* otherwise , unexpected results may be obtained through the manager
* @see # enable
* @see # disable
* /
@Override
public Collection < TaskAwarePlugin > getTaskAwarePluginList ( ) {
Lock readLock = instanceLock . readLock ( ) ;
readLock . lock ( ) ;
try {
return taskAwarePluginList ;
} finally {
readLock . unlock ( ) ;
return mainLock . applyWithReadLock ( taskAwarePluginList : : getPlugins ) ;
}
/ * *
* Whether sorting plugins is allowed .
*
* @return true if sorting plugins is allowed , false otherwise
* /
public boolean isEnableSort ( ) {
return Objects . nonNull ( pluginComparator ) ;
}
/ * *
* < p > Set whether sorting is allowed . < br / >
* < b > NOTE < / b > :
* If { @link # isEnableSort } returns false and { @code enableSort } is true ,
* All currently registered plug - ins will be reordered immediately .
*
* @param comparator comparator of plugins
* @return { @link DefaultThreadPoolPluginManager }
* /
public DefaultThreadPoolPluginManager setPluginComparator ( @NonNull Comparator < Object > comparator ) {
mainLock . runWithWriteLock ( ( ) - > {
// the specified comparator has been set
if ( Objects . equals ( this . pluginComparator , comparator ) ) {
return ;
}
this . pluginComparator = comparator ;
forQuickIndexes ( QuickIndex : : sort ) ;
} ) ;
return this ;
}
/ * *
* operate for each indexes
* /
private void forQuickIndexes ( Consumer < QuickIndex < ? extends ThreadPoolPlugin > > consumer ) {
consumer . accept ( taskAwarePluginList ) ;
consumer . accept ( executeAwarePluginList ) ;
consumer . accept ( rejectedAwarePluginList ) ;
consumer . accept ( shutdownAwarePluginList ) ;
}
/ * *
* Quick index of registered { @link ThreadPoolPlugin }
*
* @param < T > plugin type
* /
@RequiredArgsConstructor
private class QuickIndex < T extends ThreadPoolPlugin > {
/ * *
* Plugin type
* /
private final Class < T > pluginType ;
/ * *
* Plugins
* /
@Getter
private final List < T > plugins = new CopyOnWriteArrayList < > ( ) ;
/ * *
* Add plugin if possible .
*
* @param plugin plugin
* /
public void addIfPossible ( ThreadPoolPlugin plugin ) {
if ( ! pluginType . isInstance ( plugin ) ) {
return ;
}
plugins . add ( pluginType . cast ( plugin ) ) ;
sort ( ) ;
}
/ * *
* Remove plugin if possible .
*
* @param plugin plugin
* /
public void removeIfPossible ( ThreadPoolPlugin plugin ) {
if ( ! pluginType . isInstance ( plugin ) ) {
return ;
}
plugins . remove ( pluginType . cast ( plugin ) ) ;
sort ( ) ;
}
/ * *
* Sort by { @link # pluginComparator } .
* /
public void sort ( ) {
if ( isEnableSort ( ) ) {
plugins . sort ( pluginComparator ) ;
}
}
/ * *
* Clear all .
* /
public void clear ( ) {
plugins . clear ( ) ;
}
}
/ * *
* Read write lock support .
* /
@RequiredArgsConstructor
private static class ReadWriteLockSupport {
/ * *
* lock
* /
private final ReadWriteLock lock ;
/ * *
* Get the read - lock and do something .
*
* @param supplier supplier
* /
public < T > T applyWithReadLock ( Supplier < T > supplier ) {
Lock readLock = lock . readLock ( ) ;
readLock . lock ( ) ;
try {
return supplier . get ( ) ;
} finally {
readLock . unlock ( ) ;
}
}
/ * *
* Get the write - lock and do something .
*
* @param runnable runnable
* /
public void runWithWriteLock ( Runnable runnable ) {
Lock writeLock = lock . writeLock ( ) ;
writeLock . lock ( ) ;
try {
runnable . run ( ) ;
} finally {
writeLock . unlock ( ) ;
}
}
/ * *
* Get the write - lock and do something .
*
* @param supplier supplier
* /
public < T > T applyWithWriteLock ( Supplier < T > supplier ) {
Lock writeLock = lock . writeLock ( ) ;
writeLock . lock ( ) ;
try {
return supplier . get ( ) ;
} finally {
writeLock . unlock ( ) ;
}
}
}
}