添加rabbitmq线程池支持

pull/244/head
weihu 3 years ago
parent 82f516dbc5
commit a09093eadd

@ -22,7 +22,7 @@
</dependencies> </dependencies>
<build> <build>
<plugins>ConsumerWorkService <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>

@ -20,21 +20,41 @@ package cn.hippo4j.adapter.rabbitmq;
import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.toolkit.ReflectUtil;
import com.rabbitmq.client.impl.ConsumerWorkService; import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
/** /**
* RabbitMQ thread-pool adapter. * RabbitMQ thread-pool adapter.
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor
public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> { public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
private static final String RABBITMQ = "RabbitMQ"; private static final String RABBITMQ = "RabbitMQ";
private static final String FiledName = "taskExecutor";
private static final String BEAN_NAME_FILED = "beanName";
private final List<AbstractRabbitListenerContainerFactory<?>> abstractRabbitListenerContainerFactories;
private final Map<String, SimpleAsyncTaskExecutor> RABBITMQ_EXECUTOR = Maps.newHashMap();
private final Map<String, ThreadPoolTaskExecutor> RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap();
@Override @Override
public String mark() { public String mark() {
return RABBITMQ; return RABBITMQ;
@ -52,8 +72,25 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
@Override @Override
public void onApplicationEvent(ApplicationStartedEvent event) { public void onApplicationEvent(ApplicationStartedEvent event) {
ConsumerWorkService bindingLifecycle = ApplicationContextHolder.getBean(ConsumerWorkService.class); for (AbstractRabbitListenerContainerFactory<?> consumerWorkService : abstractRabbitListenerContainerFactories) {
ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings"); // 是否为自定义线程池
Executor executor = (Executor) ReflectUtil.getFieldValue(consumerWorkService, FiledName);
if (Objects.isNull(executor)) {
// 获取默认线程池
// 优先获取用户配置的
AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer();
SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName);
RABBITMQ_EXECUTOR.put(FiledName, fieldValue);
} else {
if (executor instanceof ThreadPoolTaskExecutor) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor;
String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED);
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor);
} else {
log.warn("Custom thread pools only support ThreadPoolTaskExecutor");
}
}
}
} }
} }

@ -0,0 +1,44 @@
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.config;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author : wh
* @date : 2022/5/24 10:02
* @description:
*/
@Configuration
public class RabbitMQThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(10); // 指定线程的最大数量
executor.setCorePoolSize(2); // 指定线程池维护线程的最少数量
executor.setQueueCapacity(20); // 指定等待处理的任务数
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
return executor;
}
@Bean
public AbstractRabbitListenerContainerFactory<?> defaultRabbitListenerContainerFactory (
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ThreadPoolTaskExecutor rabbitListenerTaskExecutor,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(10);
factory.setTaskExecutor(rabbitListenerTaskExecutor);
return factory;
}
}

@ -13,3 +13,5 @@ spring.rabbitmq.port=5672
spring.rabbitmq.username=guest spring.rabbitmq.username=guest
spring.rabbitmq.password=guest spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/ spring.rabbitmq.virtual-host=/
spring.rabbitmq.addresses

@ -25,7 +25,7 @@ hippo4j.core.clean-history-data-enable=true
### Data source customization section ### Data source customization section
spring.datasource.url=jdbc:mysql://localhost:3306/hippo4j_manager?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8 spring.datasource.url=jdbc:mysql://localhost:3306/hippo4j_manager?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8
spring.datasource.username=root spring.datasource.username=root
spring.datasource.password=root spring.datasource.password=123456
### Hikari Datasource ### Hikari Datasource
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

@ -19,17 +19,24 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq;
import cn.hippo4j.adapter.rabbitmq.RabbitMQThreadPoolAdapter; import cn.hippo4j.adapter.rabbitmq.RabbitMQThreadPoolAdapter;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.List;
/** /**
* Rabbit adapter auto configuration. * Rabbit adapter auto configuration.
*/ */
@Configuration @Configuration
@RequiredArgsConstructor
public class RabbitMQAdapterAutoConfiguration { public class RabbitMQAdapterAutoConfiguration {
private final List<AbstractRabbitListenerContainerFactory<?>> abstractRabbitListenerContainerFactories;
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public ApplicationContextHolder simpleApplicationContextHolder() { public ApplicationContextHolder simpleApplicationContextHolder() {
@ -38,8 +45,8 @@ public class RabbitMQAdapterAutoConfiguration {
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
@ConditionalOnProperty(name = "spring.rabbitmq.addresses") @ConditionalOnProperty(name = "spring.rabbitmq.host")
public RabbitMQThreadPoolAdapter rabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) { public RabbitMQThreadPoolAdapter rabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) {
return new RabbitMQThreadPoolAdapter(); return new RabbitMQThreadPoolAdapter(abstractRabbitListenerContainerFactories);
} }
} }

Loading…
Cancel
Save