From a956bc0d644c39cc59389720e10dd361a701e4f0 Mon Sep 17 00:00:00 2001 From: weihubeats Date: Thu, 18 Aug 2022 11:08:16 +0800 Subject: [PATCH] Support rabbitmq multi-connection (#541) --- .../rabbitmq/RabbitMQThreadPoolAdapter.java | 152 +++++++++--------- .../RabbitMQAdapterAutoConfiguration.java | 7 +- 2 files changed, 81 insertions(+), 78 deletions(-) diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index a8d8e97b..63957034 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -17,6 +17,12 @@ package cn.hippo4j.adapter.rabbitmq; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; @@ -25,16 +31,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; + import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; - import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; /** @@ -44,74 +45,73 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI @RequiredArgsConstructor public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { - private static final String RABBITMQ = "RabbitMQ"; - - private static final String FiledName = "executorService"; - - /** - * TODO Configurable name - */ - private static final String RABBITMQ_EXECUTOR_SERVICE = "Rabbitmq_Executor_Service"; - - private final AbstractConnectionFactory abstractConnectionFactory; - - private final Map RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap(); - - @Override - public String mark() { - return RABBITMQ; - } - - @Override - public ThreadPoolAdapterState getThreadPoolState(String identify) { - ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); - ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify); - threadPoolAdapterState.setThreadPoolKey(identify); - if (Objects.nonNull(threadPoolTaskExecutor)) { - threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize()); - threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaximumPoolSize()); - } - return threadPoolAdapterState; - } - - @Override - public List getThreadPoolStates() { - List adapterStateList = Lists.newArrayList(); - RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach( - (key, val) -> adapterStateList.add(getThreadPoolState(key))); - return adapterStateList; - } - - @Override - public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { - String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); - ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey); - if (Objects.nonNull(threadPoolTaskExecutor)) { - int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); - int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize(); - threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); - threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - log.info("[{}] Rabbitmq consumption thread pool parameter change. coreSize: {}, maximumSize: {}", - threadPoolKey, - String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), - String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize())); - return true; - } - log.warn("[{}] Rabbitmq consuming thread pool not found.", threadPoolKey); - return false; - } - - @Override - public void onApplicationEvent(ApplicationStartedEvent event) { - ExecutorService executor = (ExecutorService) ReflectUtil.getFieldValue(abstractConnectionFactory, FiledName); - if (Objects.nonNull(executor)) { - if (executor instanceof ThreadPoolExecutor) { - ThreadPoolExecutor threadPoolTaskExecutor = (ThreadPoolExecutor) executor; - RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(RABBITMQ_EXECUTOR_SERVICE, threadPoolTaskExecutor); - log.info("Rabbitmq executor name {}", RABBITMQ_EXECUTOR_SERVICE); - } else { - log.warn("Custom thread pools only support ThreadPoolExecutor"); - } - } - } + private static final String RABBITMQ = "RabbitMQ"; + + private static final String FiledName = "executorService"; + + private final Map abstractConnectionFactoryMap; + + private final Map RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap(); + + @Override + public String mark() { + return RABBITMQ; + } + + @Override + public ThreadPoolAdapterState getThreadPoolState(String identify) { + ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); + ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify); + threadPoolAdapterState.setThreadPoolKey(identify); + if (Objects.nonNull(threadPoolTaskExecutor)) { + threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize()); + threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaximumPoolSize()); + } + return threadPoolAdapterState; + } + + @Override + public List getThreadPoolStates() { + List adapterStateList = Lists.newArrayList(); + RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach( + (key, val) -> adapterStateList.add(getThreadPoolState(key))); + return adapterStateList; + } + + @Override + public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey); + if (Objects.nonNull(threadPoolTaskExecutor)) { + int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); + int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize(); + threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); + log.info("[{}] Rabbitmq consumption thread pool parameter change. coreSize: {}, maximumSize: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize())); + return true; + } + log.warn("[{}] Rabbitmq consuming thread pool not found.", threadPoolKey); + return false; + } + + @Override + public void onApplicationEvent(ApplicationStartedEvent event) { + abstractConnectionFactoryMap.forEach((beanName, abstractConnectionFactor) -> { + ExecutorService executor = (ExecutorService) ReflectUtil.getFieldValue(abstractConnectionFactor, FiledName); + if (Objects.nonNull(executor)) { + if (executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolTaskExecutor = (ThreadPoolExecutor) executor; + RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); + log.info("Rabbitmq executor name {}", beanName); + } + else { + log.warn("Custom thread pools only support ThreadPoolExecutor"); + } + } + + }); + } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java index 23ca19da..aec28c6d 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java @@ -17,9 +17,12 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq; +import java.util.Map; + import cn.hippo4j.adapter.rabbitmq.RabbitMQThreadPoolAdapter; import cn.hippo4j.common.config.ApplicationContextHolder; import lombok.RequiredArgsConstructor; + import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -33,7 +36,7 @@ import org.springframework.context.annotation.Configuration; @RequiredArgsConstructor public class RabbitMQAdapterAutoConfiguration { - private final AbstractConnectionFactory abstractConnectionFactories; + private final Map stringAbstractConnectionFactoryMap; @Bean @ConditionalOnMissingBean @@ -45,6 +48,6 @@ public class RabbitMQAdapterAutoConfiguration { @SuppressWarnings("all") @ConditionalOnProperty(name = "spring.rabbitmq.host") public RabbitMQThreadPoolAdapter rabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) { - return new RabbitMQThreadPoolAdapter(abstractConnectionFactories); + return new RabbitMQThreadPoolAdapter(stringAbstractConnectionFactoryMap); } }