fix: The thread pool that is not managed by spring will also be destroyed (#990)

pull/1000/head
huangchengxing 3 years ago
parent bc6c06e435
commit c6b8746dd9

@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* Is active, it will become false only when destroy() is called.
*/
@Getter
private boolean isActive;
/** /**
* Wait for tasks to complete on shutdown * Wait for tasks to complete on shutdown
*/ */
@ -97,6 +103,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
// Init default plugins. // Init default plugins.
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this); .doRegister(this);
this.isActive = true;
} }
/** /**
@ -104,12 +111,21 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
*/ */
@Override @Override
public void destroy() { public void destroy() {
// instance has been destroyed, not need to call this method again
if (!isActive) {
log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId());
return;
}
if (isWaitForTasksToCompleteOnShutdown()) { if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown(); super.shutdown();
} else { } else {
super.shutdownNow(); super.shutdownNow();
} }
getThreadPoolPluginManager().clear(); getThreadPoolPluginManager().clear();
log.info("ExecutorService '{}' has been destroyed", getThreadPoolId());
// modify the flag to false avoid the method being called repeatedly
isActive = false;
} }
/** /**

@ -95,7 +95,7 @@ public class DynamicThreadPoolExecutorTest {
}); });
executor.destroy(); executor.destroy();
// waitting for terminated // waiting for terminated
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} ; } ;
Assert.assertEquals(2, count.get()); Assert.assertEquals(2, count.get());
@ -119,9 +119,9 @@ public class DynamicThreadPoolExecutorTest {
}); });
executor.destroy(); executor.destroy();
// waitting for terminated // waiting for terminated
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} ; }
Assert.assertEquals(1, count.get()); Assert.assertEquals(1, count.get());
} }
@ -157,4 +157,21 @@ public class DynamicThreadPoolExecutorTest {
Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
} }
@Test
public void testIsActive() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertTrue(executor.isActive());
// waiting for terminated
executor.destroy();
while (!executor.isTerminated()) {
}
Assert.assertFalse(executor.isActive());
executor.destroy();
Assert.assertFalse(executor.isActive());
}
} }

@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
import static cn.hippo4j.common.constant.Constants.*;
import cn.hippo4j.common.enums.EnableEnum; import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
@ -33,6 +34,7 @@ import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage; import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
@ -44,6 +46,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -53,28 +56,70 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.ITEM_ID;
import static cn.hippo4j.common.constant.Constants.NAMESPACE;
import static cn.hippo4j.common.constant.Constants.TP_ID;
/** /**
* Dynamic thread-pool post processor. * Dynamic thread-pool post processor.
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor, DestructionAwareBeanPostProcessor {
/**
* Properties.
*/
private final BootstrapProperties properties; private final BootstrapProperties properties;
/**
* Http agent.
*/
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
/**
* Dynamic thread pool subscribe config.
*/
private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
/**
* Adapted beans.
*/
private final Map<String, String> adaptedBeans = new HashMap<>();
/**
* Post process before initialization
*
* @param bean the bean instance to be process
* @param beanName the name of the bean
* @return bean
*/
@Override @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) { public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean; return bean;
} }
/**
* <p>Post process the following beans:
* <ul>
* <li>
* instance of {@link DynamicThreadPoolExecutor},
* and the class or factory method is annotated by {@link DynamicThreadPool};
* </li>
* <li>
* matched with any {@link DynamicThreadPoolAdapter},
* and the class or factory method is annotated by {@link DynamicThreadPool};
* </li>
* <li>instance of {@link DynamicThreadPoolWrapper};</li>
* </ul>
* After finishing post-processing, the above beans will refresh their own parameters according to the configuration,
* complete {@link ThreadPoolNotifyAlarm} configuration, and be registered to {@link GlobalThreadPoolManage}.
*
* @param bean the bean instance to be process
* @param beanName the name of the bean
* @return bean
* @see DynamicThreadPoolAdapterChoose#match
* @see DynamicThreadPoolAdapterChoose#replace
* @see GlobalThreadPoolManage#dynamicRegister
* @see GlobalNotifyAlarmManage#put
* @see DynamicThreadPoolSubscribeConfig#subscribeConfig(String)
*/
@Override @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) { if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {
@ -92,10 +137,21 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex); log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean; return bean;
} }
DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { // If there is an internal thread pool that is not managed by spring,`
// the bean name of its external bean will be recorded.
// When the external bean is destroyed,
// the destroy method of the internal thread pool will be called actively
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
if (Objects.nonNull(dynamicThreadPoolExecutor)) {
if (log.isDebugEnabled()) {
log.debug("Adapt thread pool executor bean [{}]", beanName);
}
adaptedBeans.put(beanName, dynamicThreadPoolExecutor.getThreadPoolId());
} else {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
} }
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
@ -109,6 +165,29 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return bean; return bean;
} }
/**
* After the executor is destroyed, the callback of its internal executor will be triggered.
*
* @param bean the bean instance to be destroyed
* @param beanName the name of the bean
* @see DynamicThreadPoolExecutor#destroy()
*/
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
Optional.ofNullable(adaptedBeans.get(beanName))
// the internal thread pool is also managed by spring, no manual destruction required
.filter(id -> !ApplicationContextHolder.getInstance().containsBeanDefinition(id))
.map(GlobalThreadPoolManage::getExecutorService)
.ifPresent(executor -> {
try {
executor.destroy();
} catch (Exception e) {
log.warn("Failed to destroy dynamic thread pool '{}'", beanName);
}
});
}
/** /**
* Register and subscribe. * Register and subscribe.
* *
@ -176,7 +255,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* @param threadPoolParameterInfo thread-pool parameter info * @param threadPoolParameterInfo thread-pool parameter info
*/ */
private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) { private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); BlockingQueue<?> workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue); ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
@ -185,7 +264,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())); executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()));
if (executor instanceof DynamicThreadPoolExecutor) { if (executor instanceof DynamicThreadPoolExecutor) {
Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut()) Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.ifPresent(executeTimeOut -> ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut)); .ifPresent(((DynamicThreadPoolExecutor) executor)::setExecuteTimeOut);
} }
} }
@ -213,4 +292,5 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
dynamicThreadPoolSubscribeConfig.subscribeConfig(dynamicThreadPoolWrapper.getThreadPoolId()); dynamicThreadPoolSubscribeConfig.subscribeConfig(dynamicThreadPoolWrapper.getThreadPoolId());
} }
} }
} }

Loading…
Cancel
Save