diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index 6194f564..4c74893d 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -21,6 +21,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.toolkit.ReflectUtil; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -77,6 +78,15 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application } return threadPoolAdapterState; } + + public List getThreadPoolStates() { + List adapterStateList = Lists.newArrayList(); + RABBITMQ_EXECUTOR.forEach( + (key, val) -> adapterStateList.add(getThreadPoolState(key))); + RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach( + (key, val) -> adapterStateList.add(getThreadPoolState(key))); + return adapterStateList; + } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { @@ -116,17 +126,18 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application // 优先获取用户配置的 AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer(); SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName); + log.info("rabbitmq executor name {}", FiledName); RABBITMQ_EXECUTOR.put(FiledName, fieldValue); } else { if (executor instanceof ThreadPoolTaskExecutor) { ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor; String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED); RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); + log.info("rabbitmq executor name {}", beanName); } else { log.warn("Custom thread pools only support ThreadPoolTaskExecutor"); } } } - } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/RabbitMQTemplateConfig.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQTemplateConfig.java similarity index 96% rename from hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/RabbitMQTemplateConfig.java rename to hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQTemplateConfig.java index 95fc5392..b517e05a 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/RabbitMQTemplateConfig.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQTemplateConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.springboot.starter.adapter.rabbitmq.example; +package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.config; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java index 5900e33c..9c1adc24 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/config/RabbitMQThreadPoolConfig.java @@ -39,7 +39,7 @@ public class RabbitMQThreadPoolConfig { // 指定线程的最大数量 executor.setMaxPoolSize(10); // 指定线程池维护线程的最少数量 - executor.setCorePoolSize(2); + executor.setCorePoolSize(10); // 指定等待处理的任务数 executor.setQueueCapacity(20); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); @@ -52,8 +52,8 @@ public class RabbitMQThreadPoolConfig { ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); - factory.setConcurrentConsumers(1); - factory.setMaxConcurrentConsumers(10); + factory.setConcurrentConsumers(2); + factory.setMaxConcurrentConsumers(2); factory.setTaskExecutor(rabbitListenerTaskExecutor); return factory; } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/SimpleMQConstant.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java similarity index 82% rename from hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/SimpleMQConstant.java rename to hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java index abde0eb4..3b3898a1 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/SimpleMQConstant.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/constants/SimpleMQConstant.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package cn.hippo4j.springboot.starter.adapter.rabbitmq.example; +package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants; /** * Simple MQ constant. */ -public class SimpleMQConstant { +public interface SimpleMQConstant { - public static final String QUEUE_NAME = "framework_message-center_queue"; + String QUEUE_NAME = "framework_message-center_queue"; } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/MessageConsume.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java similarity index 59% rename from hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/MessageConsume.java rename to hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java index 0f4c936a..67abd0a0 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/MessageConsume.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/consumer/MessageConsumer.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.springboot.starter.adapter.rabbitmq.example; +package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.consumer; import cn.hippo4j.example.core.dto.SendMessageDTO; -import com.fasterxml.jackson.core.JsonProcessingException; +import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; @@ -26,18 +26,30 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * Message consume. */ @Slf4j @Component -public class MessageConsume { +public class MessageConsumer { + + @RabbitHandler + @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") + public void receiveObject(SendMessageDTO simple) throws Exception { + TimeUnit.SECONDS.sleep(1); + ObjectMapper objectMapper = new ObjectMapper(); + String message = objectMapper.writeValueAsString(simple); + log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getId(),message); + } @RabbitHandler - @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME)) - public void receiveObject(SendMessageDTO simple) throws JsonProcessingException { + @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") + public void receiveObject1(SendMessageDTO simple) throws Exception { + TimeUnit.SECONDS.sleep(1); ObjectMapper objectMapper = new ObjectMapper(); String message = objectMapper.writeValueAsString(simple); - log.info("Message: {}", message); + log.info("consumer2 threadId {} Message: {}", Thread.currentThread().getId(),message); } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/MessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java similarity index 71% rename from hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/MessageProduce.java rename to hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java index 56dcfbf5..c0a66817 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/MessageProduce.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rabbitmq/example/producer/MessageProduce.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.springboot.starter.adapter.rabbitmq.example; +package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.producer; import cn.hippo4j.example.core.dto.SendMessageDTO; +import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -39,13 +40,15 @@ public class MessageProduce { private final RabbitTemplate rabbitTemplate; @GetMapping("/message/send") - public String sendMessage() { - String keys = UUID.randomUUID().toString(); - SendMessageDTO payload = SendMessageDTO.builder() - .receiver("156011xxx91") - .uid(keys) - .build(); - rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload); + public String sendMessage(Integer count) { + for (int i = 0; i < count; i++) { + String keys = UUID.randomUUID().toString(); + SendMessageDTO payload = SendMessageDTO.builder() + .receiver("156011xxx91") + .uid(keys) + .build(); + rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload); + } return "success"; } }