RabbitMQ adapter optimization

pull/570/head
chen.ma 2 years ago
parent f90d23870f
commit a124169496

@ -53,7 +53,7 @@ public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAda
@Override @Override
public String mark() { public String mark() {
return "rabbitMQSpringCloudStream"; return "RabbitMQSpringCloudStream";
} }
@Override @Override
@ -71,7 +71,6 @@ public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAda
} else { } else {
result.setMaximumSize(concurrentConsumers); result.setMaximumSize(concurrentConsumers);
} }
} else if (messageListenerContainer instanceof DirectMessageListenerContainer) { } else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
int consumersPerQueue = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue"); int consumersPerQueue = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue");
result.setCoreSize(consumersPerQueue); result.setCoreSize(consumersPerQueue);
@ -79,7 +78,7 @@ public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAda
} }
return result; return result;
} }
log.warn("[{}] rabbitMQ consuming thread pool not found.", identify); log.warn("[{}] RabbitMQ consuming thread pool not found.", identify);
return result; return result;
} }
@ -116,7 +115,7 @@ public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAda
simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize); simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize);
simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize); simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize);
} }
log.info("[{}] rabbitMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}", log.info("[{}] RabbitMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey, threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize), String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, maximumPoolSize)); String.format(CHANGE_DELIMITER, originalMaximumPoolSize, maximumPoolSize));
@ -124,18 +123,17 @@ public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAda
int originalCoreSize = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue"); int originalCoreSize = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue");
DirectMessageListenerContainer directMessageListenerContainer = (DirectMessageListenerContainer) messageListenerContainer; DirectMessageListenerContainer directMessageListenerContainer = (DirectMessageListenerContainer) messageListenerContainer;
directMessageListenerContainer.setConsumersPerQueue(maximumPoolSize); directMessageListenerContainer.setConsumersPerQueue(maximumPoolSize);
log.info("[{}] rabbitMQ consumption thread pool parameter change. coreSize: {}", log.info("[{}] RabbitMQ consumption thread pool parameter change. coreSize: {}",
threadPoolKey, threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize)); String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize));
} else { } else {
log.warn("[{}] rabbitMQ consuming thread pool not support. messageListenerContainer: {}", threadPoolKey, messageListenerContainer.getClass()); log.warn("[{}] RabbitMQ consuming thread pool not support. messageListenerContainer: {}", threadPoolKey, messageListenerContainer.getClass());
return false; return false;
} }
} }
return true; return true;
} }
log.warn("[{}] rabbitMQ consuming thread pool not found.", threadPoolKey); log.warn("[{}] RabbitMQ consuming thread pool not found.", threadPoolKey);
return false; return false;
} }
@ -144,7 +142,6 @@ public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAda
InputBindingLifecycle bindingLifecycle = ApplicationContextHolder.getBean(InputBindingLifecycle.class); InputBindingLifecycle bindingLifecycle = ApplicationContextHolder.getBean(InputBindingLifecycle.class);
Collection<Binding<Object>> inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings")) Collection<Binding<Object>> inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings"))
.map(each -> (Collection<Binding<Object>>) each).orElse(null); .map(each -> (Collection<Binding<Object>>) each).orElse(null);
if (CollectionUtil.isEmpty(inputBindings)) { if (CollectionUtil.isEmpty(inputBindings)) {
log.info("InputBindings record not found."); log.info("InputBindings record not found.");
return; return;

@ -28,8 +28,6 @@ import org.springframework.context.annotation.Configuration;
/** /**
* Spring cloud stream rabbitmq adapter auto configuration. * Spring cloud stream rabbitmq adapter auto configuration.
*
* @author lijianxin
*/ */
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RabbitMessageChannelBinderConfiguration.class) @ConditionalOnClass(RabbitMessageChannelBinderConfiguration.class)

Loading…
Cancel
Save