update RabbitMQThreadPoolAdapter

pull/248/head
weihu 2 years ago
parent 1b0887753f
commit 03c335e2b6

@ -27,7 +27,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -43,7 +44,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI
*/
@Slf4j
@RequiredArgsConstructor
public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, SmartInitializingSingleton {
public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
private static final String RABBITMQ = "RabbitMQ";
@ -95,8 +96,8 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, SmartInitia
if (Objects.nonNull(threadPoolTaskExecutor)) {
int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize();
threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()),
@ -116,7 +117,7 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, SmartInitia
}
@Override
public void afterSingletonsInstantiated() {
public void onApplicationEvent(ApplicationStartedEvent event) {
for (AbstractRabbitListenerContainerFactory<?> consumerWorkService : abstractRabbitListenerContainerFactories) {
// 是否为自定义线程池
Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName);

@ -37,9 +37,9 @@ public class RabbitMQThreadPoolConfig {
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 指定线程的最大数量
executor.setMaxPoolSize(1);
executor.setMaxPoolSize(10);
// 指定线程池维护线程的最少数量
executor.setCorePoolSize(1);
executor.setCorePoolSize(10);
// 指定等待处理的任务数
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
@ -52,8 +52,8 @@ public class RabbitMQThreadPoolConfig {
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(2);
factory.setTaskExecutor(rabbitListenerTaskExecutor);
return factory;
}

@ -38,20 +38,18 @@ public class MessageConsumer {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
public void receiveObject(SendMessageDTO simple) throws Exception {
log.info("consumer1 start");
TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple);
log.info("threadId {} Message: {}", Thread.currentThread().getId(),message);
log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getId(),message);
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
public void receiveObject1(SendMessageDTO simple) throws Exception {
log.info("consumer2 start");
TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple);
log.info("threadId {} Message: {}", Thread.currentThread().getId(),message);
log.info("consumer2 threadId {} Message: {}", Thread.currentThread().getId(),message);
}
}

Loading…
Cancel
Save