feat: Support to centralized management of all plugins

pull/914/head
huangchengxing 3 years ago
parent f19ec20c57
commit f669d07cc1

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* Default implementation of {@link GlobalThreadPoolPluginManager}.
*/
public class DefaultGlobalThreadPoolPluginManager implements GlobalThreadPoolPluginManager {
/**
* enable thread pool plugins
*/
private final Map<String, ThreadPoolPlugin> enableThreadPoolPlugins = new ConcurrentHashMap<>(8);
/**
* enable thread pool plugin registrars
*/
private final Map<String, ThreadPoolPluginRegistrar> enableThreadPoolPluginRegistrars = new ConcurrentHashMap<>(8);
/**
* registered supports
*/
private final Map<String, ThreadPoolPluginSupport> managedThreadPoolPluginSupports = new ConcurrentHashMap<>(32);
/**
* Synchronize all enabled plugins and registrars in the current manager to the {@link ThreadPoolPluginSupport}
* <b>whether the support has been managed by the current manager</b>.
* After that, the support will <b>not</b> be synchronized with the plug-in and registrar states in the manager.
*
* @param support thread pool plugin manager delegate
* @see #registerThreadPoolPluginSupport
*/
@Override
public void doRegister(@NonNull ThreadPoolPluginSupport support) {
enableThreadPoolPluginRegistrars.values().forEach(registrar -> registrar.doRegister(support));
enableThreadPoolPlugins.values().forEach(support::tryRegister);
}
/**
* Synchronize all enabled plugins and registrars
* in the current manager to the {@link ThreadPoolPluginSupport} <b>if the support has not been managed before</b>,
* After that, this support will be synchronized with the plug-in and registrar status in the manager.
*
* @param support thread pool plugin manager support
* @return true if the support has not been managed before, false otherwise
* @see #doRegister
*/
@Override
public boolean registerThreadPoolPluginSupport(@NonNull ThreadPoolPluginSupport support) {
if (!managedThreadPoolPluginSupports.containsKey(support.getThreadPoolId())) {
enableThreadPoolPluginRegistrars.values().forEach(registrar -> registrar.doRegister(support));
enableThreadPoolPlugins.values().forEach(support::tryRegister);
managedThreadPoolPluginSupports.put(support.getThreadPoolId(), support);
return true;
}
return false;
}
/**
* Cancel the management of the specified {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread pool id
* @return {@link ThreadPoolPluginSupport}
*/
@Override
public ThreadPoolPluginSupport cancelManagement(String threadPoolId) {
return managedThreadPoolPluginSupports.remove(threadPoolId);
}
/**
* Get registered {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread-pool id
* @return cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager
*/
@Nullable
@Override
public ThreadPoolPluginSupport getManagedThreadPoolPluginSupport(String threadPoolId) {
return managedThreadPoolPluginSupports.get(threadPoolId);
}
/**
* Get all registered {@link ThreadPoolPluginSupport}
*
* @return all registered {@link ThreadPoolPluginSupport}
*/
@Override
public Collection<ThreadPoolPluginSupport> getAllManagedThreadPoolPluginSupports() {
return managedThreadPoolPluginSupports.values();
}
/**
* Enable plugin for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this plugin.
*
* @param plugin plugin
* @return true if the plugin has not been enabled before, false otherwise
*/
@Override
public boolean enableThreadPoolPlugin(@NonNull ThreadPoolPlugin plugin) {
if (Objects.isNull(enableThreadPoolPlugins.put(plugin.getId(), plugin))) {
managedThreadPoolPluginSupports.values().forEach(support -> support.register(plugin));
return true;
}
return false;
}
/**
* Get all enable {@link ThreadPoolPlugin}.
*
* @return all published {@link ThreadPoolPlugin}
*/
@Override
public Collection<ThreadPoolPlugin> getAllEnableThreadPoolPlugins() {
return enableThreadPoolPlugins.values();
}
/**
* Disable {@link ThreadPoolPlugin} for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will not get this registrar.
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin} if enable, null otherwise
*/
@Override
public ThreadPoolPlugin disableThreadPoolPlugin(String pluginId) {
ThreadPoolPlugin removed = enableThreadPoolPlugins.remove(pluginId);
if (Objects.nonNull(removed)) {
managedThreadPoolPluginSupports.values().forEach(support -> support.unregister(pluginId));
}
return removed;
}
/**
* Enable registrar, then apply to all registered {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this registrar.
*
* @param registrar registrar
* @return true if the registrar has not been enabled before, false otherwise
*/
@Override
public boolean enableThreadPoolPluginRegistrar(@NonNull ThreadPoolPluginRegistrar registrar) {
if (Objects.isNull(enableThreadPoolPluginRegistrars.put(registrar.getId(), registrar))) {
managedThreadPoolPluginSupports.values().forEach(registrar::doRegister);
return true;
}
return false;
}
/**
* Get all enable {@link ThreadPoolPluginRegistrar}.
*
* @return all {@link ThreadPoolPluginRegistrar}.
*/
@Override
public Collection<ThreadPoolPluginRegistrar> getAllEnableThreadPoolPluginRegistrar() {
return enableThreadPoolPluginRegistrars.values();
}
/**
* Unable {@link ThreadPoolPluginRegistrar}, newly registered support will not get this registrar.
*
* @param registrarId registrar id
* @return {@link ThreadPoolPluginRegistrar} if enable, null otherwise
*/
@Override
public ThreadPoolPluginRegistrar disableThreadPoolPluginRegistrar(String registrarId) {
return enableThreadPoolPluginRegistrars.remove(registrarId);
}
}

@ -18,7 +18,11 @@
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.impl.*;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@ -35,8 +39,6 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar {
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* Execute time out
*/
@ -47,16 +49,6 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
*/
private long awaitTerminationMillis;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return REGISTRAR_NAME;
}
/**
* Create and register plugin for the specified thread-pool instance.
*

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* A globally {@link ThreadPoolPluginManager}.
* It is used to manage {@link ThreadPoolPluginSupport} in the global,
* so that all managed {@link ThreadPoolPluginSupport} can be registered incrementally.
*/
public interface GlobalThreadPoolPluginManager extends ThreadPoolPluginRegistrar {
/**
* Synchronize all enabled plugins and registrars
* in the current manager to the {@link ThreadPoolPluginSupport}.
* After that, the support will <b>not</b> be synchronized with the plug-in and registrar states in the manager.
*
* @param support thread pool plugin manager delegate
* @see #registerThreadPoolPluginSupport
*/
@Override
void doRegister(ThreadPoolPluginSupport support);
/**
* Synchronize all enabled plugins and registrars
* in the current manager to the {@link ThreadPoolPluginSupport},
* After that, this support will be synchronized with the plug-in and registrar status in the manager.
*
* @param support thread pool plugin manager support
* @return true if the support has not been managed before, false otherwise
* @see #doRegister
*/
boolean registerThreadPoolPluginSupport(ThreadPoolPluginSupport support);
/**
* Cancel the management of the specified {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread pool id
* @return {@link ThreadPoolPluginSupport} if managed, null otherwise
*/
ThreadPoolPluginSupport cancelManagement(String threadPoolId);
/**
* Get registered {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread-pool id
* @return {@link ThreadPoolPluginSupport} if managed, null otherwise
*/
ThreadPoolPluginSupport getManagedThreadPoolPluginSupport(String threadPoolId);
/**
* Get all registered {@link ThreadPoolPluginSupport}
*
* @return all registered {@link ThreadPoolPluginSupport}
*/
Collection<ThreadPoolPluginSupport> getAllManagedThreadPoolPluginSupports();
// ===================== plugin =====================
/**
* Enable plugin for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this plugin.
*
* @param plugin plugin
* @return true if the plugin has not been enabled before, false otherwise
*/
boolean enableThreadPoolPlugin(ThreadPoolPlugin plugin);
/**
* Get all enable {@link ThreadPoolPlugin}.
*
* @return all published {@link ThreadPoolPlugin}
*/
Collection<ThreadPoolPlugin> getAllEnableThreadPoolPlugins();
/**
* Disable {@link ThreadPoolPlugin} for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will not get this registrar.
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin} if enable, null otherwise
*/
ThreadPoolPlugin disableThreadPoolPlugin(String pluginId);
/**
* Get all plugins from registered {@link ThreadPoolPluginSupport}.
*
* @return plugins
*/
default Collection<ThreadPoolPlugin> getAllPluginsFromManagers() {
return getAllManagedThreadPoolPluginSupports().stream()
.map(ThreadPoolPluginSupport::getAllPlugins)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
/**
* Get plugins of type from registered {@link ThreadPoolPluginSupport}.
*
* @param pluginType plugin type
* @return plugins
*/
default <A extends ThreadPoolPlugin> Collection<A> getPluginsOfTypeFromManagers(Class<A> pluginType) {
return getAllPluginsFromManagers().stream()
.filter(pluginType::isInstance)
.map(pluginType::cast)
.collect(Collectors.toList());
}
/**
* Get plugins by id from registered {@link ThreadPoolPluginSupport}.
*
* @param pluginId plugin id
* @return plugins
*/
default Collection<ThreadPoolPlugin> getPluginsFromManagers(String pluginId) {
return getAllManagedThreadPoolPluginSupports().stream()
.map(manager -> manager.getPlugin(pluginId))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
/**
* Unregister for all registered Managers
*
* @param pluginId plugin id
*/
default void unregisterForAllManagers(String pluginId) {
getAllManagedThreadPoolPluginSupports().forEach(s -> s.unregister(pluginId));
}
// ===================== registrar =====================
/**
* Enable registrar, then apply to all registered {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this registrar.
*
* @param registrar registrar
* @return true if the registrar has not been enabled before, false otherwise
*/
boolean enableThreadPoolPluginRegistrar(ThreadPoolPluginRegistrar registrar);
/**
* Get all enable {@link ThreadPoolPluginRegistrar}.
*
* @return all {@link ThreadPoolPluginRegistrar}.
*/
Collection<ThreadPoolPluginRegistrar> getAllEnableThreadPoolPluginRegistrar();
/**
* Unable {@link ThreadPoolPluginRegistrar}, newly registered support will not get this registrar.
*
* @param registrarId registrar id
* @return {@link ThreadPoolPluginRegistrar} if enable, null otherwise
*/
ThreadPoolPluginRegistrar disableThreadPoolPluginRegistrar(String registrarId);
}

@ -25,12 +25,13 @@ import cn.hippo4j.core.plugin.ThreadPoolPlugin;
public interface ThreadPoolPluginRegistrar {
/**
* Get id.
* In spring container, the obtained id will be used as the alias of the bean name.
* Get id
*
* @return id
*/
String getId();
default String getId() {
return this.getClass().getSimpleName();
}
/**
* Create and register plugin for the specified thread-pool instance.

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link DefaultGlobalThreadPoolPluginManager}
*/
public class DefaultGlobalThreadPoolPluginManagerTest {
@Test
public void testDoRegister() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
manager.enableThreadPoolPlugin(new TestPlugin("2"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
TestSupport support = new TestSupport("1");
manager.doRegister(support);
Assert.assertEquals(3, support.getAllPlugins().size());
}
@Test
public void testRegisterThreadPoolPluginSupport() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
Assert.assertTrue(manager.enableThreadPoolPlugin(new TestPlugin("1")));
TestSupport support = new TestSupport("1");
Assert.assertTrue(manager.registerThreadPoolPluginSupport(support));
Assert.assertFalse(manager.registerThreadPoolPluginSupport(support));
Assert.assertEquals(1, support.getAllPlugins().size());
// incremental update
manager.enableThreadPoolPlugin(new TestPlugin("2"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
Assert.assertEquals(3, support.getAllPlugins().size());
}
@Test
public void testCancelManagement() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support);
Assert.assertEquals(1, support.getAllPlugins().size());
manager.cancelManagement(support.getThreadPoolId());
manager.enableThreadPoolPlugin(new TestPlugin("2"));
Assert.assertEquals(1, support.getAllPlugins().size());
}
@Test
public void testGetManagedThreadPoolPluginSupport() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
TestSupport support = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support);
Assert.assertSame(support, manager.getManagedThreadPoolPluginSupport(support.getThreadPoolId()));
support = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support);
Assert.assertSame(support, manager.getManagedThreadPoolPluginSupport(support.getThreadPoolId()));
}
@Test
public void testGetAllManagedThreadPoolPluginSupports() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.registerThreadPoolPluginSupport(new TestSupport("1"));
manager.registerThreadPoolPluginSupport(new TestSupport("2"));
Assert.assertEquals(2, manager.getAllManagedThreadPoolPluginSupports().size());
}
@Test
public void testEnableThreadPoolPlugin() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertTrue(manager.enableThreadPoolPlugin(new TestPlugin("1")));
Assert.assertFalse(manager.enableThreadPoolPlugin(new TestPlugin("1")));
Assert.assertEquals(1, support1.getAllPlugins().size());
}
@Test
public void testGetAllEnableThreadPoolPlugins() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
manager.enableThreadPoolPlugin(new TestPlugin("2"));
Assert.assertEquals(2, manager.getAllEnableThreadPoolPlugins().size());
}
@Test
public void testDisableThreadPoolPlugin() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
manager.enableThreadPoolPlugin(new TestPlugin("2"));
manager.disableThreadPoolPlugin("2");
Assert.assertEquals(1, manager.getAllEnableThreadPoolPlugins().size());
}
@Test
public void testEnableThreadPoolPluginRegistrar() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
Assert.assertTrue(manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1")));
Assert.assertFalse(manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1")));
Assert.assertEquals(1, manager.getAllEnableThreadPoolPluginRegistrar().size());
}
@Test
public void testGetAllEnableThreadPoolPluginRegistrar() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("2"));
Assert.assertEquals(2, manager.getAllEnableThreadPoolPluginRegistrar().size());
}
@Test
public void testDisableThreadPoolPluginRegistrar() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("2"));
manager.disableThreadPoolPluginRegistrar("2");
Assert.assertEquals(1, manager.getAllEnableThreadPoolPluginRegistrar().size());
}
@Test
public void testGetAllPluginsFromManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertEquals(4, manager.getAllPluginsFromManagers().size());
}
@Test
public void testGetPluginsOfTypeFromManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertEquals(4, manager.getPluginsOfTypeFromManagers(TestPlugin.class).size());
}
@Test
public void testGetPluginsFromManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertEquals(2, manager.getPluginsFromManagers("1").size());
}
@Test
public void testUnregisterForAllManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
manager.unregisterForAllManagers("1");
Assert.assertEquals(2, manager.getAllPluginsFromManagers().size());
}
@RequiredArgsConstructor
@Getter
private static class TestSupport implements ThreadPoolPluginSupport {
private final String threadPoolId;
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
private final ThreadPoolPluginManager threadPoolPluginManager = new DefaultThreadPoolPluginManager();
}
@Getter
@RequiredArgsConstructor
private static class TestRegistrar implements ThreadPoolPluginRegistrar {
private final String id;
@Override
public void doRegister(ThreadPoolPluginSupport support) {
support.register(new TestPlugin("TestRegistrar"));
}
}
@Getter
@RequiredArgsConstructor
private static class TestPlugin implements ThreadPoolPlugin {
private final String id;
}
}
Loading…
Cancel
Save