diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index 4ed4515f..3d70aa5b 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -27,8 +27,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; -import org.springframework.boot.context.event.ApplicationStartedEvent; -import org.springframework.context.ApplicationListener; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -44,7 +43,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI */ @Slf4j @RequiredArgsConstructor -public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { +public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, SmartInitializingSingleton { private static final String RABBITMQ = "RabbitMQ"; @@ -96,8 +95,8 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application if (Objects.nonNull(threadPoolTaskExecutor)) { int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize(); - threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), @@ -117,7 +116,7 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application } @Override - public void onApplicationEvent(ApplicationStartedEvent event) { + public void afterSingletonsInstantiated() { for (AbstractRabbitListenerContainerFactory consumerWorkService : abstractRabbitListenerContainerFactories) { // 是否为自定义线程池 Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName); @@ -126,17 +125,18 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application // 优先获取用户配置的 AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer(); SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName); + log.info("rabbitmq executor name {}", FiledName); RABBITMQ_EXECUTOR.put(FiledName, fieldValue); } else { if (executor instanceof ThreadPoolTaskExecutor) { ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor; String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED); RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); + log.info("rabbitmq executor name {}", beanName); } else { log.warn("Custom thread pools only support ThreadPoolTaskExecutor"); } } } - } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java index 5900e33c..4e0b7ef2 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java @@ -37,9 +37,9 @@ public class RabbitMQThreadPoolConfig { public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 指定线程的最大数量 - executor.setMaxPoolSize(10); + executor.setMaxPoolSize(1); // 指定线程池维护线程的最少数量 - executor.setCorePoolSize(2); + executor.setCorePoolSize(1); // 指定等待处理的任务数 executor.setQueueCapacity(20); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); @@ -53,7 +53,7 @@ public class RabbitMQThreadPoolConfig { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setConcurrentConsumers(1); - factory.setMaxConcurrentConsumers(10); + factory.setMaxConcurrentConsumers(1); factory.setTaskExecutor(rabbitListenerTaskExecutor); return factory; } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java index e4d1c6c5..50156738 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java @@ -19,7 +19,6 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.consumer; import cn.hippo4j.example.core.dto.SendMessageDTO; import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; @@ -27,6 +26,8 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * Message consume. */ @@ -36,9 +37,21 @@ public class MessageConsumer { @RabbitHandler @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") - public void receiveObject(SendMessageDTO simple) throws JsonProcessingException { + public void receiveObject(SendMessageDTO simple) throws Exception { + log.info("consumer1 start"); + TimeUnit.SECONDS.sleep(1); + ObjectMapper objectMapper = new ObjectMapper(); + String message = objectMapper.writeValueAsString(simple); + log.info("threadId {} Message: {}", Thread.currentThread().getId(),message); + } + + @RabbitHandler + @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") + public void receiveObject1(SendMessageDTO simple) throws Exception { + log.info("consumer2 start"); + TimeUnit.SECONDS.sleep(1); ObjectMapper objectMapper = new ObjectMapper(); String message = objectMapper.writeValueAsString(simple); - log.info("Message: {}", message); + log.info("threadId {} Message: {}", Thread.currentThread().getId(),message); } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java index be30fb46..c0a66817 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java @@ -40,13 +40,15 @@ public class MessageProduce { private final RabbitTemplate rabbitTemplate; @GetMapping("/message/send") - public String sendMessage() { - String keys = UUID.randomUUID().toString(); - SendMessageDTO payload = SendMessageDTO.builder() - .receiver("156011xxx91") - .uid(keys) - .build(); - rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload); + public String sendMessage(Integer count) { + for (int i = 0; i < count; i++) { + String keys = UUID.randomUUID().toString(); + SendMessageDTO payload = SendMessageDTO.builder() + .receiver("156011xxx91") + .uid(keys) + .build(); + rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload); + } return "success"; } }