fix 1072: provides a way to safely set the thread pool size (#1075)

pull/1086/head
yangsanity 3 years ago committed by GitHub
parent 63ec2dc23b
commit 2546f04a98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,6 +20,7 @@ package cn.hippo4j.adapter.alibaba.dubbo;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.store.DataStore;
import lombok.extern.slf4j.Slf4j;
@ -79,8 +80,7 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica
}
int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize();
executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Alibaba Dubbo consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, executor.getCorePoolSize()),

@ -21,6 +21,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
@ -83,8 +84,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
}
int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize();
executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Dubbo consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, executor.getCorePoolSize()),

@ -24,6 +24,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterRegisterAction;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import com.netflix.hystrix.HystrixThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
@ -96,8 +97,7 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
}
int originalCoreSize = threadPoolExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
threadPoolExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Hystrix thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolExecutor.getCorePoolSize()),

@ -21,6 +21,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
@ -84,8 +85,7 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
if (Objects.nonNull(threadPoolTaskExecutor)) {
int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize();
threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolTaskExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Rabbitmq consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()),

@ -22,6 +22,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
@ -79,8 +80,7 @@ public class RocketMQThreadPoolAdapter implements ThreadPoolAdapter, Application
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(rocketMQConsumeExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),

@ -23,6 +23,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import lombok.extern.slf4j.Slf4j;
@ -87,8 +88,7 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(rocketMQConsumeExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),

@ -154,8 +154,14 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
int originalCoreSize = tomcatThreadPoolExecutor.getCorePoolSize();
int originalMaximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize();
long originalKeepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
// see cn.hippo4j.common.toolkit.ThreadPoolUtil#setCoreSizeAndMaximumSize
if (threadPoolParameterInfo.corePoolSizeAdapt() > originalMaximumPoolSize) {
tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
} else {
tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
}
tomcatThreadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
log.info("[Tomcat] Changed web thread pool. corePoolSize: {}, maximumPoolSize: {}, keepAliveTime: {}",
String.format(ChangeThreadPoolConstants.CHANGE_DELIMITER, originalCoreSize, threadPoolParameterInfo.corePoolSizeAdapt()),

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.toolkit;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Thread pool util
*
* @author yangjie
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ThreadPoolExecutorUtil {
/**
* Set the thread pool size in a safe way.
* <p>
* see https://github.com/opengoofy/hippo4j/issues/1072
*/
public static void safeSetPoolSize(ThreadPoolExecutor executor, int newCorePoolSize, int newMaximumPoolSize) {
Assert.isTrue(newCorePoolSize <= newMaximumPoolSize, "newCorePoolSize must be smaller than newMaximumPoolSize");
int originalMaximumPoolSize = executor.getMaximumPoolSize();
if (newCorePoolSize > originalMaximumPoolSize) {
executor.setMaximumPoolSize(newMaximumPoolSize);
executor.setCorePoolSize(newCorePoolSize);
} else {
executor.setCorePoolSize(newCorePoolSize);
executor.setMaximumPoolSize(newMaximumPoolSize);
}
}
}

@ -22,6 +22,7 @@ import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
@ -240,13 +241,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
if (properties.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
executor.setCorePoolSize(properties.getCorePoolSize());
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
} else {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
executor.setCorePoolSize(properties.getCorePoolSize());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());

@ -31,6 +31,7 @@ import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.AllArgsConstructor;
@ -172,13 +173,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
// fix https://github.com/opengoofy/hippo4j/issues/1063
if (executorProperties.getCorePoolSize() > executor.getMaximumPoolSize()) {
executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
executor.setCorePoolSize(executorProperties.getCorePoolSize());
} else {
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize());
executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));

@ -26,6 +26,7 @@ import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
@ -109,13 +110,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
private void changePoolInfo(ThreadPoolExecutor executor, ThreadPoolParameter parameter) {
if (parameter.getCoreSize() != null && parameter.getMaxSize() != null) {
if (parameter.getMaxSize() < executor.getMaximumPoolSize()) {
executor.setCorePoolSize(parameter.getCoreSize());
executor.setMaximumPoolSize(parameter.getMaxSize());
} else {
executor.setMaximumPoolSize(parameter.getMaxSize());
executor.setCorePoolSize(parameter.getCoreSize());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, parameter.getCoreSize(), parameter.getMaxSize());
} else {
if (parameter.getMaxSize() != null) {
executor.setMaximumPoolSize(parameter.getMaxSize());

@ -35,6 +35,7 @@ import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
@ -179,13 +180,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
// fix https://github.com/opengoofy/hippo4j/issues/1063
if (threadPoolParameterInfo.getCorePoolSize() > executor.getMaximumPoolSize()) {
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
} else {
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt());
executor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()));
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()));

Loading…
Cancel
Save