diff --git a/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java index 914f6def..2eb12e3d 100644 --- a/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java @@ -20,6 +20,7 @@ package cn.hippo4j.adapter.alibaba.dubbo; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.store.DataStore; import lombok.extern.slf4j.Slf4j; @@ -79,8 +80,7 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica } int originalCoreSize = executor.getCorePoolSize(); int originalMaximumPoolSize = executor.getMaximumPoolSize(); - executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] Alibaba Dubbo consumption thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, executor.getCorePoolSize()), diff --git a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java index b1d12ed6..a3257f19 100644 --- a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java @@ -21,6 +21,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.common.Version; import org.apache.dubbo.common.extension.ExtensionLoader; @@ -83,8 +84,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis } int originalCoreSize = executor.getCorePoolSize(); int originalMaximumPoolSize = executor.getMaximumPoolSize(); - executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] Dubbo consumption thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, executor.getCorePoolSize()), diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index 78a50765..45d9bc56 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -24,6 +24,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterRegisterAction; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import com.netflix.hystrix.HystrixThreadPool; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationStartedEvent; @@ -96,8 +97,7 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL } int originalCoreSize = threadPoolExecutor.getCorePoolSize(); int originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); - threadPoolExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - threadPoolExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] Hystrix thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolExecutor.getCorePoolSize()), diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index 86b34f7e..b8b5ae1e 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -21,6 +21,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; @@ -84,8 +85,7 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application if (Objects.nonNull(threadPoolTaskExecutor)) { int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize(); - threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); - threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); + ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolTaskExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] Rabbitmq consumption thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), diff --git a/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java index e5541703..3fa91856 100644 --- a/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java @@ -22,6 +22,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService; @@ -79,8 +80,7 @@ public class RocketMQThreadPoolAdapter implements ThreadPoolAdapter, Application if (rocketMQConsumeExecutor != null) { int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize(); int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize(); - rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + ThreadPoolExecutorUtil.safeSetPoolSize(rocketMQConsumeExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()), diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java index 60d35416..12769cf3 100644 --- a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java @@ -23,6 +23,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import lombok.extern.slf4j.Slf4j; @@ -87,8 +88,7 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda if (rocketMQConsumeExecutor != null) { int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize(); int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize(); - rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + ThreadPoolExecutorUtil.safeSetPoolSize(rocketMQConsumeExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()), diff --git a/hippo4j-adapter/hippo4j-adapter-web/src/main/java/cn/hippo4j/adapter/web/TomcatWebThreadPoolHandler.java b/hippo4j-adapter/hippo4j-adapter-web/src/main/java/cn/hippo4j/adapter/web/TomcatWebThreadPoolHandler.java index 7546cb78..f6869458 100644 --- a/hippo4j-adapter/hippo4j-adapter-web/src/main/java/cn/hippo4j/adapter/web/TomcatWebThreadPoolHandler.java +++ b/hippo4j-adapter/hippo4j-adapter-web/src/main/java/cn/hippo4j/adapter/web/TomcatWebThreadPoolHandler.java @@ -154,8 +154,14 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { int originalCoreSize = tomcatThreadPoolExecutor.getCorePoolSize(); int originalMaximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize(); long originalKeepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); - tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); - tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); + // see cn.hippo4j.common.toolkit.ThreadPoolUtil#setCoreSizeAndMaximumSize + if (threadPoolParameterInfo.corePoolSizeAdapt() > originalMaximumPoolSize) { + tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); + tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); + } else { + tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); + tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); + } tomcatThreadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS); log.info("[Tomcat] Changed web thread pool. corePoolSize: {}, maximumPoolSize: {}, keepAliveTime: {}", String.format(ChangeThreadPoolConstants.CHANGE_DELIMITER, originalCoreSize, threadPoolParameterInfo.corePoolSizeAdapt()), diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ThreadPoolExecutorUtil.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ThreadPoolExecutorUtil.java new file mode 100644 index 00000000..ae600924 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ThreadPoolExecutorUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.common.toolkit; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Thread pool util + * + * @author yangjie + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ThreadPoolExecutorUtil { + + /** + * Set the thread pool size in a safe way. + *
+ * see https://github.com/opengoofy/hippo4j/issues/1072
+ */
+ public static void safeSetPoolSize(ThreadPoolExecutor executor, int newCorePoolSize, int newMaximumPoolSize) {
+ Assert.isTrue(newCorePoolSize <= newMaximumPoolSize, "newCorePoolSize must be smaller than newMaximumPoolSize");
+ int originalMaximumPoolSize = executor.getMaximumPoolSize();
+ if (newCorePoolSize > originalMaximumPoolSize) {
+ executor.setMaximumPoolSize(newMaximumPoolSize);
+ executor.setCorePoolSize(newCorePoolSize);
+ } else {
+ executor.setCorePoolSize(newCorePoolSize);
+ executor.setMaximumPoolSize(newMaximumPoolSize);
+ }
+ }
+}
diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java
index 4ed28af4..ee212de4 100644
--- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java
+++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java
@@ -22,6 +22,7 @@ import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil;
+import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
@@ -240,13 +241,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener