diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java index 0a8400f7..73eca032 100644 --- a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java @@ -96,17 +96,21 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda if (CollectionUtil.isEmpty(inputBindings)) { log.info("InputBindings record not found."); } - for (Binding each : inputBindings) { - String bindingName = each.getBindingName(); - String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName; - DefaultBinding defaultBinding = (DefaultBinding) each; - RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) cn.hutool.core.util.ReflectUtil.getFieldValue(defaultBinding, "lifecycle"); - RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) cn.hutool.core.util.ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer"); - DefaultMQPushConsumer consumer = rocketMQListenerContainer.getConsumer(); - DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl(); - ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService(); - ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) cn.hutool.core.util.ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor"); - ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor); + try { + for (Binding each : inputBindings) { + String bindingName = each.getBindingName(); + String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName; + DefaultBinding defaultBinding = (DefaultBinding) each; + RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) cn.hutool.core.util.ReflectUtil.getFieldValue(defaultBinding, "lifecycle"); + RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) cn.hutool.core.util.ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer"); + DefaultMQPushConsumer consumer = rocketMQListenerContainer.getConsumer(); + DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl(); + ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService(); + ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) cn.hutool.core.util.ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor"); + ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor); + } + } catch (Exception ex) { + log.error("Failed to get input-bindings thread pool.", ex); } } }