diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 011e4148..d2a69252 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { + /** + * Is active, it will become false only when destroy() is called. + */ + @Getter + private boolean isActive; + /** * Wait for tasks to complete on shutdown */ @@ -97,6 +103,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl // Init default plugins. new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) .doRegister(this); + this.isActive = true; } /** @@ -104,12 +111,21 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl */ @Override public void destroy() { + // instance has been destroyed, not need to call this method again + if (!isActive) { + log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId()); + return; + } if (isWaitForTasksToCompleteOnShutdown()) { super.shutdown(); } else { super.shutdownNow(); } getThreadPoolPluginManager().clear(); + log.info("ExecutorService '{}' has been destroyed", getThreadPoolId()); + + // modify the flag to false avoid the method being called repeatedly + isActive = false; } /** diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java index 63d362af..275cd915 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -95,7 +95,7 @@ public class DynamicThreadPoolExecutorTest { }); executor.destroy(); - // waitting for terminated + // waiting for terminated while (!executor.isTerminated()) { } ; Assert.assertEquals(2, count.get()); @@ -119,9 +119,9 @@ public class DynamicThreadPoolExecutorTest { }); executor.destroy(); - // waitting for terminated + // waiting for terminated while (!executor.isTerminated()) { - } ; + } Assert.assertEquals(1, count.get()); } @@ -157,4 +157,21 @@ public class DynamicThreadPoolExecutorTest { Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); } + @Test + public void testIsActive() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + Assert.assertTrue(executor.isActive()); + + // waiting for terminated + executor.destroy(); + while (!executor.isTerminated()) { + } + Assert.assertFalse(executor.isActive()); + executor.destroy(); + Assert.assertFalse(executor.isActive()); + } + } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 3362deb0..cab38cb2 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.support; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.constant.Constants; +import static cn.hippo4j.common.constant.Constants.*; import cn.hippo4j.common.enums.EnableEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; @@ -33,6 +34,7 @@ import cn.hippo4j.core.executor.DynamicThreadPool; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.message.service.GlobalNotifyAlarmManage; @@ -44,6 +46,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; import java.util.HashMap; import java.util.Map; @@ -53,28 +56,70 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static cn.hippo4j.common.constant.Constants.ITEM_ID; -import static cn.hippo4j.common.constant.Constants.NAMESPACE; -import static cn.hippo4j.common.constant.Constants.TP_ID; - /** * Dynamic thread-pool post processor. */ @Slf4j @AllArgsConstructor -public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { +public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor, DestructionAwareBeanPostProcessor { + /** + * Properties. + */ private final BootstrapProperties properties; + /** + * Http agent. + */ private final HttpAgent httpAgent; + /** + * Dynamic thread pool subscribe config. + */ private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; + /** + * Adapted beans. + */ + private final Map adaptedBeans = new HashMap<>(); + + /** + * Post process before initialization + * + * @param bean the bean instance to be process + * @param beanName the name of the bean + * @return bean + */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; } + /** + *

Post process the following beans: + *

+ * After finishing post-processing, the above beans will refresh their own parameters according to the configuration, + * complete {@link ThreadPoolNotifyAlarm} configuration, and be registered to {@link GlobalThreadPoolManage}. + * + * @param bean the bean instance to be process + * @param beanName the name of the bean + * @return bean + * @see DynamicThreadPoolAdapterChoose#match + * @see DynamicThreadPoolAdapterChoose#replace + * @see GlobalThreadPoolManage#dynamicRegister + * @see GlobalNotifyAlarmManage#put + * @see DynamicThreadPoolSubscribeConfig#subscribeConfig(String) + */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) { @@ -92,10 +137,21 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { log.error("Failed to create dynamic thread pool in annotation mode.", ex); return bean; } - DynamicThreadPoolExecutor dynamicThreadPoolExecutor; - if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { + + // If there is an internal thread pool that is not managed by spring,` + // the bean name of its external bean will be recorded. + // When the external bean is destroyed, + // the destroy method of the internal thread pool will be called actively + DynamicThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean); + if (Objects.nonNull(dynamicThreadPoolExecutor)) { + if (log.isDebugEnabled()) { + log.debug("Adapt thread pool executor bean [{}]", beanName); + } + adaptedBeans.put(beanName, dynamicThreadPoolExecutor.getThreadPoolId()); + } else { dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; } + DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); @@ -109,6 +165,29 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return bean; } + /** + * After the executor is destroyed, the callback of its internal executor will be triggered. + * + * @param bean the bean instance to be destroyed + * @param beanName the name of the bean + * @see DynamicThreadPoolExecutor#destroy() + */ + @Override + public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException { + Optional.ofNullable(adaptedBeans.get(beanName)) + // the internal thread pool is also managed by spring, no manual destruction required + .filter(id -> !ApplicationContextHolder.getInstance().containsBeanDefinition(id)) + .map(GlobalThreadPoolManage::getExecutorService) + .ifPresent(executor -> { + try { + executor.destroy(); + } catch (Exception e) { + log.warn("Failed to destroy dynamic thread pool '{}'", beanName); + } + }); + + } + /** * Register and subscribe. * @@ -176,7 +255,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { * @param threadPoolParameterInfo thread-pool parameter info */ private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) { - BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); + BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); ReflectUtil.setFieldValue(executor, "workQueue", workQueue); executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); @@ -185,7 +264,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())); if (executor instanceof DynamicThreadPoolExecutor) { Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut()) - .ifPresent(executeTimeOut -> ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut)); + .ifPresent(((DynamicThreadPoolExecutor) executor)::setExecuteTimeOut); } } @@ -213,4 +292,5 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { dynamicThreadPoolSubscribeConfig.subscribeConfig(dynamicThreadPoolWrapper.getThreadPoolId()); } } + }