diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index 4c74893d..905d58a4 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -25,17 +25,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; +import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; -import org.springframework.core.task.SimpleAsyncTaskExecutor; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; @@ -48,15 +46,17 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application private static final String RABBITMQ = "RabbitMQ"; - private static final String FiledName = "taskExecutor"; + private static final String FiledName = "executorService"; private static final String BEAN_NAME_FILED = "beanName"; + + // todo: Configurable name + private static final String RABBITMQ_EXECUTOR_SERVICE = "Rabbitmq_Executor_Service"; + - private final List> abstractRabbitListenerContainerFactories; - - private final Map RABBITMQ_EXECUTOR = Maps.newHashMap(); + private final AbstractConnectionFactory abstractConnectionFactory; - private final Map RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap(); + private final Map RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap(); @Override public String mark() { @@ -66,23 +66,17 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); - SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(identify); - ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify); + ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify); threadPoolAdapterState.setThreadPoolKey(identify); - if (Objects.nonNull(simpleAsyncTaskExecutor)) { - threadPoolAdapterState.setCoreSize(simpleAsyncTaskExecutor.getConcurrencyLimit()); - } if (Objects.nonNull(threadPoolTaskExecutor)) { threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize()); - threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaxPoolSize()); + threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaximumPoolSize()); } return threadPoolAdapterState; } public List getThreadPoolStates() { List adapterStateList = Lists.newArrayList(); - RABBITMQ_EXECUTOR.forEach( - (key, val) -> adapterStateList.add(getThreadPoolState(key))); RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach( (key, val) -> adapterStateList.add(getThreadPoolState(key))); return adapterStateList; @@ -91,53 +85,35 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); - SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(threadPoolKey); - ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey); + ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey); if (Objects.nonNull(threadPoolTaskExecutor)) { int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); - int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize(); + int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize(); + threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); - threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize())); return true; } - if (Objects.nonNull(simpleAsyncTaskExecutor)) { - int concurrencyLimit = simpleAsyncTaskExecutor.getConcurrencyLimit(); - simpleAsyncTaskExecutor.setConcurrencyLimit(threadPoolAdapterParameter.getCorePoolSize()); - log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}", - threadPoolKey, - String.format(CHANGE_DELIMITER, concurrencyLimit, threadPoolAdapterParameter.getCorePoolSize())); - return true; - } log.warn("[{}] rabbitmq consuming thread pool not found.", threadPoolKey); return false; } @Override public void onApplicationEvent(ApplicationStartedEvent event) { - for (AbstractRabbitListenerContainerFactory consumerWorkService : abstractRabbitListenerContainerFactories) { - // 是否为自定义线程池 - Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName); - if (Objects.isNull(executor)) { - // 获取默认线程池 - // 优先获取用户配置的 - AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer(); - SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName); - log.info("rabbitmq executor name {}", FiledName); - RABBITMQ_EXECUTOR.put(FiledName, fieldValue); + ExecutorService executor = (ExecutorService) ReflectUtil.getFieldValue(abstractConnectionFactory, FiledName); + if (Objects.nonNull(executor)) { + if (executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolTaskExecutor = (ThreadPoolExecutor) executor; + String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED); + RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(RABBITMQ_EXECUTOR_SERVICE, threadPoolTaskExecutor); + log.info("rabbitmq executor name {}", RABBITMQ_EXECUTOR_SERVICE); } else { - if (executor instanceof ThreadPoolTaskExecutor) { - ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor; - String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED); - RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); - log.info("rabbitmq executor name {}", beanName); - } else { - log.warn("Custom thread pools only support ThreadPoolTaskExecutor"); - } + log.warn("Custom thread pools only support ThreadPoolExecutor"); } + } } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java index 9c1adc24..799e8743 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java @@ -18,9 +18,9 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.config; import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -37,24 +37,24 @@ public class RabbitMQThreadPoolConfig { public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 指定线程的最大数量 - executor.setMaxPoolSize(10); + executor.setMaxPoolSize(5); // 指定线程池维护线程的最少数量 - executor.setCorePoolSize(10); + executor.setCorePoolSize(5); // 指定等待处理的任务数 - executor.setQueueCapacity(20); + executor.setQueueCapacity(1000); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); return executor; } @Bean - public AbstractRabbitListenerContainerFactory defaultRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, - ThreadPoolTaskExecutor rabbitListenerTaskExecutor, - ConnectionFactory connectionFactory) { - SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); - configurer.configure(factory, connectionFactory); - factory.setConcurrentConsumers(2); - factory.setMaxConcurrentConsumers(2); - factory.setTaskExecutor(rabbitListenerTaskExecutor); + public AbstractRabbitListenerContainerFactory defaultRabbitListenerContainerFactory(ThreadPoolTaskExecutor rabbitListenerTaskExecutor, + MessageConverter messageConverter, AbstractConnectionFactory abstractConnectionFactory) { + DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); + factory.setConnectionFactory(abstractConnectionFactory); +// factory.setTaskExecutor(rabbitListenerTaskExecutor); + factory.setMessageConverter(messageConverter); + factory.setConsumersPerQueue(10); + abstractConnectionFactory.setExecutor(rabbitListenerTaskExecutor); return factory; } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java index 3b3898a1..1ee50b9e 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java @@ -23,4 +23,6 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants; public interface SimpleMQConstant { String QUEUE_NAME = "framework_message-center_queue"; + + String QUEUE_NAME1 = "framework_message-center_queue1"; } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java index 67abd0a0..a3419d71 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java @@ -37,19 +37,22 @@ public class MessageConsumer { @RabbitHandler @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") +// @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME)) public void receiveObject(SendMessageDTO simple) throws Exception { TimeUnit.SECONDS.sleep(1); ObjectMapper objectMapper = new ObjectMapper(); String message = objectMapper.writeValueAsString(simple); - log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getId(),message); + log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getName(),message); } - @RabbitHandler - @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") - public void receiveObject1(SendMessageDTO simple) throws Exception { - TimeUnit.SECONDS.sleep(1); - ObjectMapper objectMapper = new ObjectMapper(); - String message = objectMapper.writeValueAsString(simple); - log.info("consumer2 threadId {} Message: {}", Thread.currentThread().getId(),message); - } +// @RabbitHandler +// @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME1)) +// public void receiveObject1(SendMessageDTO simple) throws Exception { +// TimeUnit.SECONDS.sleep(1); +// ObjectMapper objectMapper = new ObjectMapper(); +// String message = objectMapper.writeValueAsString(simple); +// log.info("consumer threadId {} Message: {}", Thread.currentThread().getName(),message); +// } + + } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java index 70049376..7eeba70e 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/RabbitMQAdapterAutoConfiguration.java @@ -20,14 +20,12 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq; import cn.hippo4j.adapter.rabbitmq.RabbitMQThreadPoolAdapter; import cn.hippo4j.common.config.ApplicationContextHolder; import lombok.RequiredArgsConstructor; -import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.List; - /** * Rabbit adapter auto configuration. */ @@ -35,7 +33,8 @@ import java.util.List; @RequiredArgsConstructor public class RabbitMQAdapterAutoConfiguration { - private final List> abstractRabbitListenerContainerFactories; + + private final AbstractConnectionFactory abstractConnectionFactories; @Bean @ConditionalOnMissingBean @@ -47,6 +46,6 @@ public class RabbitMQAdapterAutoConfiguration { @SuppressWarnings("all") @ConditionalOnProperty(name = "spring.rabbitmq.host") public RabbitMQThreadPoolAdapter rabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) { - return new RabbitMQThreadPoolAdapter(abstractRabbitListenerContainerFactories); + return new RabbitMQThreadPoolAdapter(abstractConnectionFactories); } }