Support automatic registration of plugins and plugin registrars (#914)

* feat: Support to centralized management of all plugins

* feat: Support automatic registration of plugins and plugin registrars
pull/916/head
黄成兴 2 years ago committed by GitHub
parent 2604336725
commit 328bb4a47f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* Default implementation of {@link GlobalThreadPoolPluginManager}.
*/
public class DefaultGlobalThreadPoolPluginManager implements GlobalThreadPoolPluginManager {
/**
* enable thread pool plugins
*/
private final Map<String, ThreadPoolPlugin> enableThreadPoolPlugins = new ConcurrentHashMap<>(8);
/**
* enable thread pool plugin registrars
*/
private final Map<String, ThreadPoolPluginRegistrar> enableThreadPoolPluginRegistrars = new ConcurrentHashMap<>(8);
/**
* registered supports
*/
private final Map<String, ThreadPoolPluginSupport> managedThreadPoolPluginSupports = new ConcurrentHashMap<>(32);
/**
* Synchronize all enabled plugins and registrars in the current manager to the {@link ThreadPoolPluginSupport}
* <b>whether the support has been managed by the current manager</b>.
* After that, the support will <b>not</b> be synchronized with the plug-in and registrar states in the manager.
*
* @param support thread pool plugin manager delegate
* @see #registerThreadPoolPluginSupport
*/
@Override
public void doRegister(@NonNull ThreadPoolPluginSupport support) {
enableThreadPoolPluginRegistrars.values().forEach(registrar -> registrar.doRegister(support));
enableThreadPoolPlugins.values().forEach(support::tryRegister);
}
/**
* Synchronize all enabled plugins and registrars
* in the current manager to the {@link ThreadPoolPluginSupport} <b>if the support has not been managed before</b>,
* After that, this support will be synchronized with the plug-in and registrar status in the manager.
*
* @param support thread pool plugin manager support
* @return true if the support has not been managed before, false otherwise
* @see #doRegister
*/
@Override
public boolean registerThreadPoolPluginSupport(@NonNull ThreadPoolPluginSupport support) {
if (!managedThreadPoolPluginSupports.containsKey(support.getThreadPoolId())) {
enableThreadPoolPluginRegistrars.values().forEach(registrar -> registrar.doRegister(support));
enableThreadPoolPlugins.values().forEach(support::tryRegister);
managedThreadPoolPluginSupports.put(support.getThreadPoolId(), support);
return true;
}
return false;
}
/**
* Cancel the management of the specified {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread pool id
* @return {@link ThreadPoolPluginSupport}
*/
@Override
public ThreadPoolPluginSupport cancelManagement(String threadPoolId) {
return managedThreadPoolPluginSupports.remove(threadPoolId);
}
/**
* Get registered {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread-pool id
* @return cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager
*/
@Nullable
@Override
public ThreadPoolPluginSupport getManagedThreadPoolPluginSupport(String threadPoolId) {
return managedThreadPoolPluginSupports.get(threadPoolId);
}
/**
* Get all registered {@link ThreadPoolPluginSupport}
*
* @return all registered {@link ThreadPoolPluginSupport}
*/
@Override
public Collection<ThreadPoolPluginSupport> getAllManagedThreadPoolPluginSupports() {
return managedThreadPoolPluginSupports.values();
}
/**
* Enable plugin for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this plugin.
*
* @param plugin plugin
* @return true if the plugin has not been enabled before, false otherwise
*/
@Override
public boolean enableThreadPoolPlugin(@NonNull ThreadPoolPlugin plugin) {
if (Objects.isNull(enableThreadPoolPlugins.put(plugin.getId(), plugin))) {
managedThreadPoolPluginSupports.values().forEach(support -> support.register(plugin));
return true;
}
return false;
}
/**
* Get all enable {@link ThreadPoolPlugin}.
*
* @return all published {@link ThreadPoolPlugin}
*/
@Override
public Collection<ThreadPoolPlugin> getAllEnableThreadPoolPlugins() {
return enableThreadPoolPlugins.values();
}
/**
* Disable {@link ThreadPoolPlugin} for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will not get this registrar.
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin} if enable, null otherwise
*/
@Override
public ThreadPoolPlugin disableThreadPoolPlugin(String pluginId) {
ThreadPoolPlugin removed = enableThreadPoolPlugins.remove(pluginId);
if (Objects.nonNull(removed)) {
managedThreadPoolPluginSupports.values().forEach(support -> support.unregister(pluginId));
}
return removed;
}
/**
* Enable registrar, then apply to all registered {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this registrar.
*
* @param registrar registrar
* @return true if the registrar has not been enabled before, false otherwise
*/
@Override
public boolean enableThreadPoolPluginRegistrar(@NonNull ThreadPoolPluginRegistrar registrar) {
if (Objects.isNull(enableThreadPoolPluginRegistrars.put(registrar.getId(), registrar))) {
managedThreadPoolPluginSupports.values().forEach(registrar::doRegister);
return true;
}
return false;
}
/**
* Get all enable {@link ThreadPoolPluginRegistrar}.
*
* @return all {@link ThreadPoolPluginRegistrar}.
*/
@Override
public Collection<ThreadPoolPluginRegistrar> getAllEnableThreadPoolPluginRegistrar() {
return enableThreadPoolPluginRegistrars.values();
}
/**
* Unable {@link ThreadPoolPluginRegistrar}, newly registered support will not get this registrar.
*
* @param registrarId registrar id
* @return {@link ThreadPoolPluginRegistrar} if enable, null otherwise
*/
@Override
public ThreadPoolPluginRegistrar disableThreadPoolPluginRegistrar(String registrarId) {
return enableThreadPoolPluginRegistrars.remove(registrarId);
}
}

@ -18,7 +18,11 @@
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.impl.*;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@ -35,8 +39,6 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar {
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* Execute time out
*/
@ -47,16 +49,6 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
*/
private long awaitTerminationMillis;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return REGISTRAR_NAME;
}
/**
* Create and register plugin for the specified thread-pool instance.
*

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* A globally {@link ThreadPoolPluginManager}.
* It is used to manage {@link ThreadPoolPluginSupport} in the global,
* so that all managed {@link ThreadPoolPluginSupport} can be registered incrementally.
*/
public interface GlobalThreadPoolPluginManager extends ThreadPoolPluginRegistrar {
/**
* Synchronize all enabled plugins and registrars
* in the current manager to the {@link ThreadPoolPluginSupport}.
* After that, the support will <b>not</b> be synchronized with the plug-in and registrar states in the manager.
*
* @param support thread pool plugin manager delegate
* @see #registerThreadPoolPluginSupport
*/
@Override
void doRegister(ThreadPoolPluginSupport support);
/**
* Synchronize all enabled plugins and registrars
* in the current manager to the {@link ThreadPoolPluginSupport},
* After that, this support will be synchronized with the plug-in and registrar status in the manager.
*
* @param support thread pool plugin manager support
* @return true if the support has not been managed before, false otherwise
* @see #doRegister
*/
boolean registerThreadPoolPluginSupport(ThreadPoolPluginSupport support);
/**
* Cancel the management of the specified {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread pool id
* @return {@link ThreadPoolPluginSupport} if managed, null otherwise
*/
ThreadPoolPluginSupport cancelManagement(String threadPoolId);
/**
* Get registered {@link ThreadPoolPluginSupport}.
*
* @param threadPoolId thread-pool id
* @return {@link ThreadPoolPluginSupport} if managed, null otherwise
*/
ThreadPoolPluginSupport getManagedThreadPoolPluginSupport(String threadPoolId);
/**
* Get all registered {@link ThreadPoolPluginSupport}
*
* @return all registered {@link ThreadPoolPluginSupport}
*/
Collection<ThreadPoolPluginSupport> getAllManagedThreadPoolPluginSupports();
// ===================== plugin =====================
/**
* Enable plugin for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this plugin.
*
* @param plugin plugin
* @return true if the plugin has not been enabled before, false otherwise
*/
boolean enableThreadPoolPlugin(ThreadPoolPlugin plugin);
/**
* Get all enable {@link ThreadPoolPlugin}.
*
* @return all published {@link ThreadPoolPlugin}
*/
Collection<ThreadPoolPlugin> getAllEnableThreadPoolPlugins();
/**
* Disable {@link ThreadPoolPlugin} for all {@link ThreadPoolPluginSupport},
* after action, newly registered support will not get this registrar.
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin} if enable, null otherwise
*/
ThreadPoolPlugin disableThreadPoolPlugin(String pluginId);
/**
* Get all plugins from registered {@link ThreadPoolPluginSupport}.
*
* @return plugins
*/
default Collection<ThreadPoolPlugin> getAllPluginsFromManagers() {
return getAllManagedThreadPoolPluginSupports().stream()
.map(ThreadPoolPluginSupport::getAllPlugins)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
/**
* Get plugins of type from registered {@link ThreadPoolPluginSupport}.
*
* @param pluginType plugin type
* @return plugins
*/
default <A extends ThreadPoolPlugin> Collection<A> getPluginsOfTypeFromManagers(Class<A> pluginType) {
return getAllPluginsFromManagers().stream()
.filter(pluginType::isInstance)
.map(pluginType::cast)
.collect(Collectors.toList());
}
/**
* Get plugins by id from registered {@link ThreadPoolPluginSupport}.
*
* @param pluginId plugin id
* @return plugins
*/
default Collection<ThreadPoolPlugin> getPluginsFromManagers(String pluginId) {
return getAllManagedThreadPoolPluginSupports().stream()
.map(manager -> manager.getPlugin(pluginId))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
/**
* Unregister for all registered Managers
*
* @param pluginId plugin id
*/
default void unregisterForAllManagers(String pluginId) {
getAllManagedThreadPoolPluginSupports().forEach(s -> s.unregister(pluginId));
}
// ===================== registrar =====================
/**
* Enable registrar, then apply to all registered {@link ThreadPoolPluginSupport},
* after action, newly registered support will also get this registrar.
*
* @param registrar registrar
* @return true if the registrar has not been enabled before, false otherwise
*/
boolean enableThreadPoolPluginRegistrar(ThreadPoolPluginRegistrar registrar);
/**
* Get all enable {@link ThreadPoolPluginRegistrar}.
*
* @return all {@link ThreadPoolPluginRegistrar}.
*/
Collection<ThreadPoolPluginRegistrar> getAllEnableThreadPoolPluginRegistrar();
/**
* Unable {@link ThreadPoolPluginRegistrar}, newly registered support will not get this registrar.
*
* @param registrarId registrar id
* @return {@link ThreadPoolPluginRegistrar} if enable, null otherwise
*/
ThreadPoolPluginRegistrar disableThreadPoolPluginRegistrar(String registrarId);
}

@ -25,12 +25,13 @@ import cn.hippo4j.core.plugin.ThreadPoolPlugin;
public interface ThreadPoolPluginRegistrar {
/**
* Get id.
* In spring container, the obtained id will be used as the alias of the bean name.
* Get id
*
* @return id
*/
String getId();
default String getId() {
return this.getClass().getSimpleName();
}
/**
* Create and register plugin for the specified thread-pool instance.

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link DefaultGlobalThreadPoolPluginManager}
*/
public class DefaultGlobalThreadPoolPluginManagerTest {
@Test
public void testDoRegister() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
manager.enableThreadPoolPlugin(new TestPlugin("2"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
TestSupport support = new TestSupport("1");
manager.doRegister(support);
Assert.assertEquals(3, support.getAllPlugins().size());
}
@Test
public void testRegisterThreadPoolPluginSupport() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
Assert.assertTrue(manager.enableThreadPoolPlugin(new TestPlugin("1")));
TestSupport support = new TestSupport("1");
Assert.assertTrue(manager.registerThreadPoolPluginSupport(support));
Assert.assertFalse(manager.registerThreadPoolPluginSupport(support));
Assert.assertEquals(1, support.getAllPlugins().size());
// incremental update
manager.enableThreadPoolPlugin(new TestPlugin("2"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
Assert.assertEquals(3, support.getAllPlugins().size());
}
@Test
public void testCancelManagement() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support);
Assert.assertEquals(1, support.getAllPlugins().size());
manager.cancelManagement(support.getThreadPoolId());
manager.enableThreadPoolPlugin(new TestPlugin("2"));
Assert.assertEquals(1, support.getAllPlugins().size());
}
@Test
public void testGetManagedThreadPoolPluginSupport() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
TestSupport support = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support);
Assert.assertSame(support, manager.getManagedThreadPoolPluginSupport(support.getThreadPoolId()));
support = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support);
Assert.assertSame(support, manager.getManagedThreadPoolPluginSupport(support.getThreadPoolId()));
}
@Test
public void testGetAllManagedThreadPoolPluginSupports() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.registerThreadPoolPluginSupport(new TestSupport("1"));
manager.registerThreadPoolPluginSupport(new TestSupport("2"));
Assert.assertEquals(2, manager.getAllManagedThreadPoolPluginSupports().size());
}
@Test
public void testEnableThreadPoolPlugin() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertTrue(manager.enableThreadPoolPlugin(new TestPlugin("1")));
Assert.assertFalse(manager.enableThreadPoolPlugin(new TestPlugin("1")));
Assert.assertEquals(1, support1.getAllPlugins().size());
}
@Test
public void testGetAllEnableThreadPoolPlugins() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
manager.enableThreadPoolPlugin(new TestPlugin("2"));
Assert.assertEquals(2, manager.getAllEnableThreadPoolPlugins().size());
}
@Test
public void testDisableThreadPoolPlugin() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPlugin(new TestPlugin("1"));
manager.enableThreadPoolPlugin(new TestPlugin("2"));
manager.disableThreadPoolPlugin("2");
Assert.assertEquals(1, manager.getAllEnableThreadPoolPlugins().size());
}
@Test
public void testEnableThreadPoolPluginRegistrar() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
Assert.assertTrue(manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1")));
Assert.assertFalse(manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1")));
Assert.assertEquals(1, manager.getAllEnableThreadPoolPluginRegistrar().size());
}
@Test
public void testGetAllEnableThreadPoolPluginRegistrar() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("2"));
Assert.assertEquals(2, manager.getAllEnableThreadPoolPluginRegistrar().size());
}
@Test
public void testDisableThreadPoolPluginRegistrar() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("2"));
manager.disableThreadPoolPluginRegistrar("2");
Assert.assertEquals(1, manager.getAllEnableThreadPoolPluginRegistrar().size());
}
@Test
public void testGetAllPluginsFromManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertEquals(4, manager.getAllPluginsFromManagers().size());
}
@Test
public void testGetPluginsOfTypeFromManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertEquals(4, manager.getPluginsOfTypeFromManagers(TestPlugin.class).size());
}
@Test
public void testGetPluginsFromManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
Assert.assertEquals(2, manager.getPluginsFromManagers("1").size());
}
@Test
public void testUnregisterForAllManagers() {
GlobalThreadPoolPluginManager manager = new DefaultGlobalThreadPoolPluginManager();
manager.enableThreadPoolPluginRegistrar(new TestRegistrar("1"));
manager.enableThreadPoolPlugin(new TestPlugin("1"));
TestSupport support1 = new TestSupport("1");
manager.registerThreadPoolPluginSupport(support1);
TestSupport support2 = new TestSupport("2");
manager.registerThreadPoolPluginSupport(support2);
manager.unregisterForAllManagers("1");
Assert.assertEquals(2, manager.getAllPluginsFromManagers().size());
}
@RequiredArgsConstructor
@Getter
private static class TestSupport implements ThreadPoolPluginSupport {
private final String threadPoolId;
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
private final ThreadPoolPluginManager threadPoolPluginManager = new DefaultThreadPoolPluginManager();
}
@Getter
@RequiredArgsConstructor
private static class TestRegistrar implements ThreadPoolPluginRegistrar {
private final String id;
@Override
public void doRegister(ThreadPoolPluginSupport support) {
support.register(new TestPlugin("TestRegistrar"));
}
}
@Getter
@RequiredArgsConstructor
private static class TestPlugin implements ThreadPoolPlugin {
private final String id;
}
}

@ -59,6 +59,7 @@ import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHttpAgent;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@ -224,4 +225,10 @@ public class DynamicThreadPoolAutoConfiguration {
public HttpAgent httpAgent(BootstrapProperties properties) {
return new ServerHttpAgent(properties);
}
@Bean
public ThreadPoolPluginRegisterPostProcessor threadPoolPluginRegisterPostProcessor() {
return new ThreadPoolPluginRegisterPostProcessor();
}
}

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.manager.DefaultGlobalThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.GlobalThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginRegistrar;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.autoproxy.AutoProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationContextException;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.MessageSourceAware;
import org.springframework.context.ResourceLoaderAware;
import java.util.Objects;
/**
* <p>The extension implementation of {@link GlobalThreadPoolPluginManager} and {@link BeanPostProcessor},
* used to register {@link ThreadPoolPlugin} for the bean initialization stage of the {@link ThreadPoolPluginSupport}.
*
* <p><b>NOTE:</b>
* If the {@link ThreadPoolPlugin}, {@link ThreadPoolPluginRegistrar}, and {@link ThreadPoolPluginSupport} is set to lazy load,
* The processor will not perceive the bean unless the user actively triggers the initialization of the bean.
*
* @see ThreadPoolPluginSupport
* @see ThreadPoolPluginRegistrar
* @see ThreadPoolPlugin
* @see GlobalThreadPoolPluginManager
* @see DefaultGlobalThreadPoolPluginManager
*/
@Slf4j
public class ThreadPoolPluginRegisterPostProcessor extends DefaultGlobalThreadPoolPluginManager implements BeanPostProcessor, ApplicationContextAware {
/**
* application context
*/
private ConfigurableListableBeanFactory beanFactory;
/**
* <p>Post process bean, if bean is instance of {@link ThreadPoolPlugin},
* {@link ThreadPoolPluginRegistrar} or {@link ThreadPoolPluginSupport},
* then take beans as an available component and register to {@link GlobalThreadPoolPluginManager}.
*
* @param bean the new bean instance
* @param beanName the name of the bean
* @return the bean instance to use, either the original or a wrapped one;
* if {@code null}, no subsequent BeanPostProcessors will be invoked
* @throws BeansException in case of errors
* @see GlobalThreadPoolPluginManager#enableThreadPoolPlugin
* @see GlobalThreadPoolPluginManager#enableThreadPoolPluginRegistrar
* @see GlobalThreadPoolPluginManager#registerThreadPoolPluginSupport
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanType = null;
try {
beanType = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
} catch (NoSuchBeanDefinitionException ex) {
if (log.isDebugEnabled()) {
log.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
}
}
if (Objects.isNull(beanType)) {
log.warn("cannot resolve type for bean [{}]", beanName);
return bean;
}
// register bean if necessary
registerThreadPoolPluginRegistrarIfNecessary(bean, beanType);
registerThreadPoolPluginIfNecessary(bean, beanType);
registerThreadPoolPluginSupportIfNecessary(bean, beanType);
return bean;
}
private void registerThreadPoolPluginSupportIfNecessary(Object bean, Class<?> beanType) {
if (ThreadPoolPluginSupport.class.isAssignableFrom(beanType)) {
ThreadPoolPluginSupport support = (ThreadPoolPluginSupport)bean;
if (registerThreadPoolPluginSupport(support) && log.isDebugEnabled()) {
log.info("register ThreadPoolPluginSupport [{}]", support.getThreadPoolId());
}
}
}
private void registerThreadPoolPluginIfNecessary(Object bean, Class<?> beanType) {
if (ThreadPoolPlugin.class.isAssignableFrom(beanType)) {
ThreadPoolPlugin plugin = (ThreadPoolPlugin)bean;
if (enableThreadPoolPlugin(plugin) && log.isDebugEnabled()) {
log.info("register ThreadPoolPlugin [{}]", plugin.getId());
}
}
}
private void registerThreadPoolPluginRegistrarIfNecessary(Object bean, Class<?> beanType) {
if (ThreadPoolPluginRegistrar.class.isAssignableFrom(beanType)) {
ThreadPoolPluginRegistrar registrar = (ThreadPoolPluginRegistrar)bean;
if (enableThreadPoolPluginRegistrar(registrar) && log.isDebugEnabled()) {
log.info("register ThreadPoolPluginRegistrar [{}]", registrar.getId());
}
}
}
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
AutowireCapableBeanFactory factory = applicationContext.getAutowireCapableBeanFactory();
Assert.isTrue(
factory instanceof ConfigurableListableBeanFactory,
"factory cannot cast to ConfigurableListableBeanFactory");
this.beanFactory = (ConfigurableListableBeanFactory) factory;
}
}
Loading…
Cancel
Save