diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 011e4148..d2a69252 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { + /** + * Is active, it will become false only when destroy() is called. + */ + @Getter + private boolean isActive; + /** * Wait for tasks to complete on shutdown */ @@ -97,6 +103,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl // Init default plugins. new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) .doRegister(this); + this.isActive = true; } /** @@ -104,12 +111,21 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl */ @Override public void destroy() { + // instance has been destroyed, not need to call this method again + if (!isActive) { + log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId()); + return; + } if (isWaitForTasksToCompleteOnShutdown()) { super.shutdown(); } else { super.shutdownNow(); } getThreadPoolPluginManager().clear(); + log.info("ExecutorService '{}' has been destroyed", getThreadPoolId()); + + // modify the flag to false avoid the method being called repeatedly + isActive = false; } /** diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java index 63d362af..275cd915 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -95,7 +95,7 @@ public class DynamicThreadPoolExecutorTest { }); executor.destroy(); - // waitting for terminated + // waiting for terminated while (!executor.isTerminated()) { } ; Assert.assertEquals(2, count.get()); @@ -119,9 +119,9 @@ public class DynamicThreadPoolExecutorTest { }); executor.destroy(); - // waitting for terminated + // waiting for terminated while (!executor.isTerminated()) { - } ; + } Assert.assertEquals(1, count.get()); } @@ -157,4 +157,21 @@ public class DynamicThreadPoolExecutorTest { Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); } + @Test + public void testIsActive() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + Assert.assertTrue(executor.isActive()); + + // waiting for terminated + executor.destroy(); + while (!executor.isTerminated()) { + } + Assert.assertFalse(executor.isActive()); + executor.destroy(); + Assert.assertFalse(executor.isActive()); + } + } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 8614c4fc..163b9bd5 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -57,6 +57,7 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHttpAgent; +import cn.hippo4j.springboot.starter.support.AdaptedThreadPoolDestroyPostProcessor; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor; @@ -120,6 +121,11 @@ public class DynamicThreadPoolAutoConfiguration { return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); } + @Bean + public AdaptedThreadPoolDestroyPostProcessor adaptedThreadPoolDestroyPostProcessor() { + return new AdaptedThreadPoolDestroyPostProcessor(); + } + @Bean @SuppressWarnings("all") public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java new file mode 100644 index 00000000..9aac3e36 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/AdaptedThreadPoolDestroyPostProcessor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.support; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter; +import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; + +import java.util.Optional; + +/** + *
Adapted thread pool destroy post processor.
+ * The processor is used to destroy the internal {@link DynamicThreadPoolExecutor} instance
+ * in the instance adapted by {@link DynamicThreadPoolAdapter} in the spring context.
+ *
+ * @see DynamicThreadPoolAdapter
+ */
+@Slf4j
+public class AdaptedThreadPoolDestroyPostProcessor implements DestructionAwareBeanPostProcessor {
+
+ /**
+ * If {@link DynamicThreadPoolAdapterChoose#match} method returns true,
+ * try to destroy its internal {@link DynamicThreadPoolExecutor} instance.
+ *
+ * @param bean the bean instance to check
+ * @return {@code true} if {@link DynamicThreadPoolAdapterChoose#match} method returns true, false otherwise
+ * @see DynamicThreadPoolAdapterChoose#match
+ */
+ @Override
+ public boolean requiresDestruction(Object bean) {
+ return DynamicThreadPoolAdapterChoose.match(bean);
+ }
+
+ /**
+ * If the internal {@link DynamicThreadPoolExecutor} instance in the adapted bean is not managed by spring,
+ * call its {@link DynamicThreadPoolExecutor#destroy()} directly.
+ *
+ * @param bean the bean instance to be destroyed
+ * @param beanName the name of the bean
+ * @throws BeansException in case of errors
+ * @see DynamicThreadPoolExecutor#destroy()
+ */
+ @Override
+ public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
+ Optional.ofNullable(DynamicThreadPoolAdapterChoose.unwrap(bean))
+ .map(DynamicThreadPoolExecutor::getThreadPoolId)
+ // the internal thread pool is also managed by spring, no manual destruction required
+ .filter(id -> !ApplicationContextHolder.getInstance().containsBeanDefinition(id))
+ .map(GlobalThreadPoolManage::getExecutorService)
+ .ifPresent(executor -> destroyAdaptedThreadPoolExecutor(beanName, executor));
+ }
+
+ private static void destroyAdaptedThreadPoolExecutor(String beanName, DynamicThreadPoolWrapper executor) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.info("Destroy adapted dynamic thread pool '{}'", executor.getThreadPoolId());
+ }
+ executor.destroy();
+ } catch (Exception e) {
+ log.warn("Failed to destroy dynamic thread pool '{}'", beanName);
+ }
+ }
+}