From 90aca00ba4e7b8aa85f6ce9ce199217d73991726 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Mon, 21 Nov 2022 18:08:51 +0800 Subject: [PATCH] fix: The thread pool that is not managed by spring will also be destroyed (#990) --- .../executor/DynamicThreadPoolExecutor.java | 16 ++++ .../DynamicThreadPoolExecutorTest.java | 23 ++++- .../DynamicThreadPoolAutoConfiguration.java | 6 ++ ...AdaptedThreadPoolDestroyPostProcessor.java | 84 +++++++++++++++++++ 4 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 011e4148..d2a69252 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { + /** + * Is active, it will become false only when destroy() is called. + */ + @Getter + private boolean isActive; + /** * Wait for tasks to complete on shutdown */ @@ -97,6 +103,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl // Init default plugins. new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) .doRegister(this); + this.isActive = true; } /** @@ -104,12 +111,21 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl */ @Override public void destroy() { + // instance has been destroyed, not need to call this method again + if (!isActive) { + log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId()); + return; + } if (isWaitForTasksToCompleteOnShutdown()) { super.shutdown(); } else { super.shutdownNow(); } getThreadPoolPluginManager().clear(); + log.info("ExecutorService '{}' has been destroyed", getThreadPoolId()); + + // modify the flag to false avoid the method being called repeatedly + isActive = false; } /** diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java index 63d362af..275cd915 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -95,7 +95,7 @@ public class DynamicThreadPoolExecutorTest { }); executor.destroy(); - // waitting for terminated + // waiting for terminated while (!executor.isTerminated()) { } ; Assert.assertEquals(2, count.get()); @@ -119,9 +119,9 @@ public class DynamicThreadPoolExecutorTest { }); executor.destroy(); - // waitting for terminated + // waiting for terminated while (!executor.isTerminated()) { - } ; + } Assert.assertEquals(1, count.get()); } @@ -157,4 +157,21 @@ public class DynamicThreadPoolExecutorTest { Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); } + @Test + public void testIsActive() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + Assert.assertTrue(executor.isActive()); + + // waiting for terminated + executor.destroy(); + while (!executor.isTerminated()) { + } + Assert.assertFalse(executor.isActive()); + executor.destroy(); + Assert.assertFalse(executor.isActive()); + } + } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 8614c4fc..163b9bd5 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -57,6 +57,7 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHttpAgent; +import cn.hippo4j.springboot.starter.support.AdaptedThreadPoolDestroyPostProcessor; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor; @@ -120,6 +121,11 @@ public class DynamicThreadPoolAutoConfiguration { return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); } + @Bean + public AdaptedThreadPoolDestroyPostProcessor adaptedThreadPoolDestroyPostProcessor() { + return new AdaptedThreadPoolDestroyPostProcessor(); + } + @Bean @SuppressWarnings("all") public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java new file mode 100644 index 00000000..9aac3e36 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.support; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter; +import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; + +import java.util.Optional; + +/** + *

Adapted thread pool destroy post processor.
+ * The processor is used to destroy the internal {@link DynamicThreadPoolExecutor} instance + * in the instance adapted by {@link DynamicThreadPoolAdapter} in the spring context. + * + * @see DynamicThreadPoolAdapter + */ +@Slf4j +public class AdaptedThreadPoolDestroyPostProcessor implements DestructionAwareBeanPostProcessor { + + /** + * If {@link DynamicThreadPoolAdapterChoose#match} method returns true, + * try to destroy its internal {@link DynamicThreadPoolExecutor} instance. + * + * @param bean the bean instance to check + * @return {@code true} if {@link DynamicThreadPoolAdapterChoose#match} method returns true, false otherwise + * @see DynamicThreadPoolAdapterChoose#match + */ + @Override + public boolean requiresDestruction(Object bean) { + return DynamicThreadPoolAdapterChoose.match(bean); + } + + /** + * If the internal {@link DynamicThreadPoolExecutor} instance in the adapted bean is not managed by spring, + * call its {@link DynamicThreadPoolExecutor#destroy()} directly. + * + * @param bean the bean instance to be destroyed + * @param beanName the name of the bean + * @throws BeansException in case of errors + * @see DynamicThreadPoolExecutor#destroy() + */ + @Override + public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException { + Optional.ofNullable(DynamicThreadPoolAdapterChoose.unwrap(bean)) + .map(DynamicThreadPoolExecutor::getThreadPoolId) + // the internal thread pool is also managed by spring, no manual destruction required + .filter(id -> !ApplicationContextHolder.getInstance().containsBeanDefinition(id)) + .map(GlobalThreadPoolManage::getExecutorService) + .ifPresent(executor -> destroyAdaptedThreadPoolExecutor(beanName, executor)); + } + + private static void destroyAdaptedThreadPoolExecutor(String beanName, DynamicThreadPoolWrapper executor) { + try { + if (log.isDebugEnabled()) { + log.info("Destroy adapted dynamic thread pool '{}'", executor.getThreadPoolId()); + } + executor.destroy(); + } catch (Exception e) { + log.warn("Failed to destroy dynamic thread pool '{}'", beanName); + } + } +}