update message

pull/248/head
weihu 2 years ago
parent ffc4f6a4d5
commit 1b0887753f

@ -27,8 +27,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationListener;
import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -44,7 +43,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> { public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, SmartInitializingSingleton {
private static final String RABBITMQ = "RabbitMQ"; private static final String RABBITMQ = "RabbitMQ";
@ -96,8 +95,8 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
if (Objects.nonNull(threadPoolTaskExecutor)) { if (Objects.nonNull(threadPoolTaskExecutor)) {
int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize(); int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize();
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}",
threadPoolKey, threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()),
@ -117,7 +116,7 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
} }
@Override @Override
public void onApplicationEvent(ApplicationStartedEvent event) { public void afterSingletonsInstantiated() {
for (AbstractRabbitListenerContainerFactory<?> consumerWorkService : abstractRabbitListenerContainerFactories) { for (AbstractRabbitListenerContainerFactory<?> consumerWorkService : abstractRabbitListenerContainerFactories) {
// 是否为自定义线程池 // 是否为自定义线程池
Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName); Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName);
@ -126,17 +125,18 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
// 优先获取用户配置的 // 优先获取用户配置的
AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer(); AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer();
SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName); SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName);
log.info("rabbitmq executor name {}", FiledName);
RABBITMQ_EXECUTOR.put(FiledName, fieldValue); RABBITMQ_EXECUTOR.put(FiledName, fieldValue);
} else { } else {
if (executor instanceof ThreadPoolTaskExecutor) { if (executor instanceof ThreadPoolTaskExecutor) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor; ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor;
String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED); String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED);
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor);
log.info("rabbitmq executor name {}", beanName);
} else { } else {
log.warn("Custom thread pools only support ThreadPoolTaskExecutor"); log.warn("Custom thread pools only support ThreadPoolTaskExecutor");
} }
} }
} }
} }
} }

@ -37,9 +37,9 @@ public class RabbitMQThreadPoolConfig {
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 指定线程的最大数量 // 指定线程的最大数量
executor.setMaxPoolSize(10); executor.setMaxPoolSize(1);
// 指定线程池维护线程的最少数量 // 指定线程池维护线程的最少数量
executor.setCorePoolSize(2); executor.setCorePoolSize(1);
// 指定等待处理的任务数 // 指定等待处理的任务数
executor.setQueueCapacity(20); executor.setQueueCapacity(20);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
@ -53,7 +53,7 @@ public class RabbitMQThreadPoolConfig {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory); configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(1); factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(10); factory.setMaxConcurrentConsumers(1);
factory.setTaskExecutor(rabbitListenerTaskExecutor); factory.setTaskExecutor(rabbitListenerTaskExecutor);
return factory; return factory;
} }

@ -19,7 +19,6 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.consumer;
import cn.hippo4j.example.core.dto.SendMessageDTO; import cn.hippo4j.example.core.dto.SendMessageDTO;
import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant; import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
@ -27,6 +26,8 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/** /**
* Message consume. * Message consume.
*/ */
@ -36,9 +37,21 @@ public class MessageConsumer {
@RabbitHandler @RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
public void receiveObject(SendMessageDTO simple) throws JsonProcessingException { public void receiveObject(SendMessageDTO simple) throws Exception {
log.info("consumer1 start");
TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple);
log.info("threadId {} Message: {}", Thread.currentThread().getId(),message);
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
public void receiveObject1(SendMessageDTO simple) throws Exception {
log.info("consumer2 start");
TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple); String message = objectMapper.writeValueAsString(simple);
log.info("Message: {}", message); log.info("threadId {} Message: {}", Thread.currentThread().getId(),message);
} }
} }

@ -40,13 +40,15 @@ public class MessageProduce {
private final RabbitTemplate rabbitTemplate; private final RabbitTemplate rabbitTemplate;
@GetMapping("/message/send") @GetMapping("/message/send")
public String sendMessage() { public String sendMessage(Integer count) {
String keys = UUID.randomUUID().toString(); for (int i = 0; i < count; i++) {
SendMessageDTO payload = SendMessageDTO.builder() String keys = UUID.randomUUID().toString();
.receiver("156011xxx91") SendMessageDTO payload = SendMessageDTO.builder()
.uid(keys) .receiver("156011xxx91")
.build(); .uid(keys)
rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload); .build();
rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload);
}
return "success"; return "success";
} }
} }

Loading…
Cancel
Save