Merge pull request #249 from weihubeats/feature_rabbitmq

update RabbitMQThreadPoolAdapter
pull/253/head
Long Tai 3 years ago committed by GitHub
commit 91aa114e7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,17 +25,15 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
@ -48,15 +46,17 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
private static final String RABBITMQ = "RabbitMQ"; private static final String RABBITMQ = "RabbitMQ";
private static final String FiledName = "taskExecutor"; private static final String FiledName = "executorService";
private static final String BEAN_NAME_FILED = "beanName"; private static final String BEAN_NAME_FILED = "beanName";
// todo: Configurable name
private static final String RABBITMQ_EXECUTOR_SERVICE = "Rabbitmq_Executor_Service";
private final List<AbstractRabbitListenerContainerFactory<?>> abstractRabbitListenerContainerFactories; private final AbstractConnectionFactory abstractConnectionFactory;
private final Map<String, SimpleAsyncTaskExecutor> RABBITMQ_EXECUTOR = Maps.newHashMap();
private final Map<String, ThreadPoolTaskExecutor> RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap(); private final Map<String, ThreadPoolExecutor> RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap();
@Override @Override
public String mark() { public String mark() {
@ -66,23 +66,17 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
@Override @Override
public ThreadPoolAdapterState getThreadPoolState(String identify) { public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(identify); ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify);
ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify);
threadPoolAdapterState.setThreadPoolKey(identify); threadPoolAdapterState.setThreadPoolKey(identify);
if (Objects.nonNull(simpleAsyncTaskExecutor)) {
threadPoolAdapterState.setCoreSize(simpleAsyncTaskExecutor.getConcurrencyLimit());
}
if (Objects.nonNull(threadPoolTaskExecutor)) { if (Objects.nonNull(threadPoolTaskExecutor)) {
threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize()); threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize());
threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaxPoolSize()); threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaximumPoolSize());
} }
return threadPoolAdapterState; return threadPoolAdapterState;
} }
public List<ThreadPoolAdapterState> getThreadPoolStates() { public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> adapterStateList = Lists.newArrayList(); List<ThreadPoolAdapterState> adapterStateList = Lists.newArrayList();
RABBITMQ_EXECUTOR.forEach(
(key, val) -> adapterStateList.add(getThreadPoolState(key)));
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach( RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach(
(key, val) -> adapterStateList.add(getThreadPoolState(key))); (key, val) -> adapterStateList.add(getThreadPoolState(key)));
return adapterStateList; return adapterStateList;
@ -91,53 +85,35 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
@Override @Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(threadPoolKey); ThreadPoolExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey);
ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey);
if (Objects.nonNull(threadPoolTaskExecutor)) { if (Objects.nonNull(threadPoolTaskExecutor)) {
int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize(); int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize();
threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}",
threadPoolKey, threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize())); String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize()));
return true; return true;
} }
if (Objects.nonNull(simpleAsyncTaskExecutor)) {
int concurrencyLimit = simpleAsyncTaskExecutor.getConcurrencyLimit();
simpleAsyncTaskExecutor.setConcurrencyLimit(threadPoolAdapterParameter.getCorePoolSize());
log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, concurrencyLimit, threadPoolAdapterParameter.getCorePoolSize()));
return true;
}
log.warn("[{}] rabbitmq consuming thread pool not found.", threadPoolKey); log.warn("[{}] rabbitmq consuming thread pool not found.", threadPoolKey);
return false; return false;
} }
@Override @Override
public void onApplicationEvent(ApplicationStartedEvent event) { public void onApplicationEvent(ApplicationStartedEvent event) {
for (AbstractRabbitListenerContainerFactory<?> consumerWorkService : abstractRabbitListenerContainerFactories) { ExecutorService executor = (ExecutorService) ReflectUtil.getFieldValue(abstractConnectionFactory, FiledName);
// 是否为自定义线程池 if (Objects.nonNull(executor)) {
Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName); if (executor instanceof ThreadPoolExecutor) {
if (Objects.isNull(executor)) { ThreadPoolExecutor threadPoolTaskExecutor = (ThreadPoolExecutor) executor;
// 获取默认线程池 String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED);
// 优先获取用户配置的 RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(RABBITMQ_EXECUTOR_SERVICE, threadPoolTaskExecutor);
AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer(); log.info("rabbitmq executor name {}", RABBITMQ_EXECUTOR_SERVICE);
SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName);
log.info("rabbitmq executor name {}", FiledName);
RABBITMQ_EXECUTOR.put(FiledName, fieldValue);
} else { } else {
if (executor instanceof ThreadPoolTaskExecutor) { log.warn("Custom thread pools only support ThreadPoolExecutor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor;
String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED);
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor);
log.info("rabbitmq executor name {}", beanName);
} else {
log.warn("Custom thread pools only support ThreadPoolTaskExecutor");
}
} }
} }
} }
} }

@ -18,9 +18,9 @@
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.config; package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.config;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -37,24 +37,24 @@ public class RabbitMQThreadPoolConfig {
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 指定线程的最大数量 // 指定线程的最大数量
executor.setMaxPoolSize(10); executor.setMaxPoolSize(5);
// 指定线程池维护线程的最少数量 // 指定线程池维护线程的最少数量
executor.setCorePoolSize(10); executor.setCorePoolSize(5);
// 指定等待处理的任务数 // 指定等待处理的任务数
executor.setQueueCapacity(20); executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
return executor; return executor;
} }
@Bean @Bean
public AbstractRabbitListenerContainerFactory<?> defaultRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, public AbstractRabbitListenerContainerFactory<?> defaultRabbitListenerContainerFactory(ThreadPoolTaskExecutor rabbitListenerTaskExecutor,
ThreadPoolTaskExecutor rabbitListenerTaskExecutor, MessageConverter messageConverter, AbstractConnectionFactory abstractConnectionFactory) {
ConnectionFactory connectionFactory) { DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(abstractConnectionFactory);
configurer.configure(factory, connectionFactory); // factory.setTaskExecutor(rabbitListenerTaskExecutor);
factory.setConcurrentConsumers(2); factory.setMessageConverter(messageConverter);
factory.setMaxConcurrentConsumers(2); factory.setConsumersPerQueue(10);
factory.setTaskExecutor(rabbitListenerTaskExecutor); abstractConnectionFactory.setExecutor(rabbitListenerTaskExecutor);
return factory; return factory;
} }
} }

@ -23,4 +23,6 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants;
public interface SimpleMQConstant { public interface SimpleMQConstant {
String QUEUE_NAME = "framework_message-center_queue"; String QUEUE_NAME = "framework_message-center_queue";
String QUEUE_NAME1 = "framework_message-center_queue1";
} }

@ -37,19 +37,22 @@ public class MessageConsumer {
@RabbitHandler @RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
// @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME))
public void receiveObject(SendMessageDTO simple) throws Exception { public void receiveObject(SendMessageDTO simple) throws Exception {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple); String message = objectMapper.writeValueAsString(simple);
log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getId(),message); log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getName(),message);
} }
@RabbitHandler // @RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory") // @RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME1))
public void receiveObject1(SendMessageDTO simple) throws Exception { // public void receiveObject1(SendMessageDTO simple) throws Exception {
TimeUnit.SECONDS.sleep(1); // TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper(); // ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple); // String message = objectMapper.writeValueAsString(simple);
log.info("consumer2 threadId {} Message: {}", Thread.currentThread().getId(),message); // log.info("consumer threadId {} Message: {}", Thread.currentThread().getName(),message);
} // }
} }

@ -20,14 +20,12 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq;
import cn.hippo4j.adapter.rabbitmq.RabbitMQThreadPoolAdapter; import cn.hippo4j.adapter.rabbitmq.RabbitMQThreadPoolAdapter;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.List;
/** /**
* Rabbit adapter auto configuration. * Rabbit adapter auto configuration.
*/ */
@ -35,7 +33,8 @@ import java.util.List;
@RequiredArgsConstructor @RequiredArgsConstructor
public class RabbitMQAdapterAutoConfiguration { public class RabbitMQAdapterAutoConfiguration {
private final List<AbstractRabbitListenerContainerFactory<?>> abstractRabbitListenerContainerFactories;
private final AbstractConnectionFactory abstractConnectionFactories;
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@ -47,6 +46,6 @@ public class RabbitMQAdapterAutoConfiguration {
@SuppressWarnings("all") @SuppressWarnings("all")
@ConditionalOnProperty(name = "spring.rabbitmq.host") @ConditionalOnProperty(name = "spring.rabbitmq.host")
public RabbitMQThreadPoolAdapter rabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) { public RabbitMQThreadPoolAdapter rabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) {
return new RabbitMQThreadPoolAdapter(abstractRabbitListenerContainerFactories); return new RabbitMQThreadPoolAdapter(abstractConnectionFactories);
} }
} }

Loading…
Cancel
Save