fix: The thread pool that is not managed by spring will also be destroyed (#990)

pull/1003/head
huangchengxing 3 years ago
parent 585c8f6178
commit 90aca00ba4

@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* Is active, it will become false only when destroy() is called.
*/
@Getter
private boolean isActive;
/**
* Wait for tasks to complete on shutdown
*/
@ -97,6 +103,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
// Init default plugins.
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this);
this.isActive = true;
}
/**
@ -104,12 +111,21 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
*/
@Override
public void destroy() {
// instance has been destroyed, not need to call this method again
if (!isActive) {
log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId());
return;
}
if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
getThreadPoolPluginManager().clear();
log.info("ExecutorService '{}' has been destroyed", getThreadPoolId());
// modify the flag to false avoid the method being called repeatedly
isActive = false;
}
/**

@ -95,7 +95,7 @@ public class DynamicThreadPoolExecutorTest {
});
executor.destroy();
// waitting for terminated
// waiting for terminated
while (!executor.isTerminated()) {
} ;
Assert.assertEquals(2, count.get());
@ -119,9 +119,9 @@ public class DynamicThreadPoolExecutorTest {
});
executor.destroy();
// waitting for terminated
// waiting for terminated
while (!executor.isTerminated()) {
} ;
}
Assert.assertEquals(1, count.get());
}
@ -157,4 +157,21 @@ public class DynamicThreadPoolExecutorTest {
Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
}
@Test
public void testIsActive() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertTrue(executor.isActive());
// waiting for terminated
executor.destroy();
while (!executor.isTerminated()) {
}
Assert.assertFalse(executor.isActive());
executor.destroy();
Assert.assertFalse(executor.isActive());
}
}

@ -57,6 +57,7 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHttpAgent;
import cn.hippo4j.springboot.starter.support.AdaptedThreadPoolDestroyPostProcessor;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor;
@ -120,6 +121,11 @@ public class DynamicThreadPoolAutoConfiguration {
return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig);
}
@Bean
public AdaptedThreadPoolDestroyPostProcessor adaptedThreadPoolDestroyPostProcessor() {
return new AdaptedThreadPoolDestroyPostProcessor();
}
@Bean
@SuppressWarnings("all")
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent,

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import java.util.Optional;
/**
* <p>Adapted thread pool destroy post processor. <br />
* The processor is used to destroy the internal {@link DynamicThreadPoolExecutor} instance
* in the instance adapted by {@link DynamicThreadPoolAdapter} in the spring context.
*
* @see DynamicThreadPoolAdapter
*/
@Slf4j
public class AdaptedThreadPoolDestroyPostProcessor implements DestructionAwareBeanPostProcessor {
/**
* If {@link DynamicThreadPoolAdapterChoose#match} method returns true,
* try to destroy its internal {@link DynamicThreadPoolExecutor} instance.
*
* @param bean the bean instance to check
* @return {@code true} if {@link DynamicThreadPoolAdapterChoose#match} method returns true, false otherwise
* @see DynamicThreadPoolAdapterChoose#match
*/
@Override
public boolean requiresDestruction(Object bean) {
return DynamicThreadPoolAdapterChoose.match(bean);
}
/**
* If the internal {@link DynamicThreadPoolExecutor} instance in the adapted bean is not managed by spring,
* call its {@link DynamicThreadPoolExecutor#destroy()} directly.
*
* @param bean the bean instance to be destroyed
* @param beanName the name of the bean
* @throws BeansException in case of errors
* @see DynamicThreadPoolExecutor#destroy()
*/
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
Optional.ofNullable(DynamicThreadPoolAdapterChoose.unwrap(bean))
.map(DynamicThreadPoolExecutor::getThreadPoolId)
// the internal thread pool is also managed by spring, no manual destruction required
.filter(id -> !ApplicationContextHolder.getInstance().containsBeanDefinition(id))
.map(GlobalThreadPoolManage::getExecutorService)
.ifPresent(executor -> destroyAdaptedThreadPoolExecutor(beanName, executor));
}
private static void destroyAdaptedThreadPoolExecutor(String beanName, DynamicThreadPoolWrapper executor) {
try {
if (log.isDebugEnabled()) {
log.info("Destroy adapted dynamic thread pool '{}'", executor.getThreadPoolId());
}
executor.destroy();
} catch (Exception e) {
log.warn("Failed to destroy dynamic thread pool '{}'", beanName);
}
}
}
Loading…
Cancel
Save