Add exception capture when getting rocketmq thread pool

pull/233/head
chen.ma 2 years ago
parent 132d25fa8a
commit 9e724a264b

@ -96,17 +96,21 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
if (CollectionUtil.isEmpty(inputBindings)) { if (CollectionUtil.isEmpty(inputBindings)) {
log.info("InputBindings record not found."); log.info("InputBindings record not found.");
} }
for (Binding<Object> each : inputBindings) { try {
String bindingName = each.getBindingName(); for (Binding<Object> each : inputBindings) {
String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName; String bindingName = each.getBindingName();
DefaultBinding defaultBinding = (DefaultBinding) each; String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName;
RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) cn.hutool.core.util.ReflectUtil.getFieldValue(defaultBinding, "lifecycle"); DefaultBinding defaultBinding = (DefaultBinding) each;
RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) cn.hutool.core.util.ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer"); RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) cn.hutool.core.util.ReflectUtil.getFieldValue(defaultBinding, "lifecycle");
DefaultMQPushConsumer consumer = rocketMQListenerContainer.getConsumer(); RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) cn.hutool.core.util.ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer");
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl(); DefaultMQPushConsumer consumer = rocketMQListenerContainer.getConsumer();
ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService(); DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl();
ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) cn.hutool.core.util.ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor"); ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService();
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor); ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) cn.hutool.core.util.ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor");
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor);
}
} catch (Exception ex) {
log.error("Failed to get input-bindings thread pool.", ex);
} }
} }
} }

Loading…
Cancel
Save