From a4dc0369ff2919b289d2760993cd9cc0f6ca5956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=9C=E7=81=AC=E7=9E=AC?= <2431038086@qq.com> Date: Wed, 21 Jun 2023 11:45:28 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E5=8A=A0?= =?UTF-8?q?=E4=B8=8A=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/BasicIdMessageProcessor.java | 19 ++++++++ .../mq/DelayedMessageProcessor.java | 22 +++++++++ .../common/autoconfigure/mq/MqConfig.java | 46 +++++++++++++++++++ .../autoconfigure/mq/RabbitMqHelper.java | 13 ++++++ 4 files changed, 100 insertions(+) diff --git a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/BasicIdMessageProcessor.java b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/BasicIdMessageProcessor.java index 1aac80b..21859a2 100644 --- a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/BasicIdMessageProcessor.java +++ b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/BasicIdMessageProcessor.java @@ -8,14 +8,33 @@ import org.springframework.amqp.core.MessagePostProcessor; import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER; +/** + * 这段代码是一个Java类, + * 实现了MessagePostProcessor接口, + * 用于在消息发送到消息代理之前修改消息。 + * 这个特定的实现的目的是为每个消息添加一个唯一的请求ID。 + * @author 夜灬瞬 + */ public class BasicIdMessageProcessor implements MessagePostProcessor { @Override public Message postProcessMessage(Message message) throws AmqpException { + /* + 该方法首先尝试使用常量REQUEST_ID_HEADER(requestId) + 作为键从MDC(Mapped Diagnostic Context)中检索请求ID。 + */ String requestId = MDC.get(REQUEST_ID_HEADER); if (requestId == null) { + /* + 如果未找到请求ID, + 则使用UUID类生成一个新的请求ID, + 并将结果存储在requestId变量中。 + */ requestId = UUID.randomUUID().toString(true); } // 写入RequestID标示 + /* + 接着使用setHeader方法将请求ID添加到消息的属性中,使用常量REQUEST_ID_HEADER作为键 + */ message.getMessageProperties().setHeader(REQUEST_ID_HEADER, requestId); return message; } diff --git a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/DelayedMessageProcessor.java b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/DelayedMessageProcessor.java index 0b22a72..0a7a32c 100644 --- a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/DelayedMessageProcessor.java +++ b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/DelayedMessageProcessor.java @@ -5,14 +5,36 @@ import org.springframework.amqp.core.Message; import java.time.Duration; +/** + * 在发送消息时添加延迟时间 + * @author yhs + */ public class DelayedMessageProcessor extends BasicIdMessageProcessor { private final long delay; + /** + * 构造函数中的参数是一个Duration类型的对象,表示延迟的时间。 + * 在构造函数中,这个时间会被转换为毫秒,并存储在类的私有变量delay中。 + * @param delay 变量 + */ public DelayedMessageProcessor(Duration delay) { this.delay = delay.toMillis(); } + /** + * postProcessMessage方法是在发送消息之前调用的方法,它会对消息进行一些处理。 + * 首先 + * 它调用了BasicIdMessageProcessor类的postProcessMessage方法, + * 添加了一个消息ID。 + * 然后 + * 它将延迟时间添加到消息属性中 + * 使用了一个名为"x-delay"的头部信息。 + * 这样,在发送消息时,就可以通过这个头部信息指定消息的延迟时间,从而实现延迟发送的功能。 + * @param message 消息 + * @return 返回 + * @throws AmqpException 异常 + */ @Override public Message postProcessMessage(Message message) throws AmqpException { // 1.添加消息id diff --git a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/MqConfig.java b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/MqConfig.java index c0f2049..57cecca 100644 --- a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/MqConfig.java +++ b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/MqConfig.java @@ -29,6 +29,17 @@ import static com.tianji.common.constants.MqConstants.Key.ERROR_KEY_PREFIX; import static com.tianji.common.constants.MqConstants.Queue.ERROR_QUEUE_TEMPLATE; +/** + * MqConfig 用于配置RabbitMQ的相关参数和组件 + * 实现了Spring的EnvironmentAware接口, + * 用于在应用程序启动时获取应用程序的名称, + * 并根据该名称设置默认的错误路由键和错误队列。 + *
+ * {@code @ConditionalOnClass} + * 这个注解表示只有当 MessageConverter和 AmqpTemplate 这两个类在类路径中存在时 + * 才会创建这个配置类 + * @author 夜灬瞬 + */ @Configuration @ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class}) public class MqConfig implements EnvironmentAware{ @@ -36,15 +47,33 @@ public class MqConfig implements EnvironmentAware{ private String defaultErrorRoutingKey; private String defaultErrorQueue; + /** + * 用于创建一个名为"rabbitListenerContainerFactory"的bean对象。 + * 这个bean对象是一个RabbitMQ的监听容器工厂,用于处理消息的消费者。 + * @param configurer + * @param connectionFactory + * @param simpleContainerCustomizer + * @return + */ @Bean(name = "rabbitListenerContainerFactory") @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true) SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider> simpleContainerCustomizer) { + // 创建 SimpleRabbitListenerContainerFactory 对象 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + // 使用 configurer对象对 factory对象进行配置 configurer.configure(factory, connectionFactory); + // 设置一个容器自定义器,用于对容器进行进一步的定制。 simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer); + /* + 这段代码设置了一个消息后处理器。 + 这个后处理器会在消息被消费者接收后执行,用于对消息进行后续处理。 + 具体来说, + 这个后处理器会从消息的消息属性中获取一个名为"REQUEST_ID_HEADER"的头部信息,并将其放入MDC中。 + 这样,在后续的日志输出中,就可以方便地获取到这个请求的ID信息了。 + */ factory.setAfterReceivePostProcessors(message -> { Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER); if(header != null) { @@ -55,6 +84,11 @@ public class MqConfig implements EnvironmentAware{ return factory; } + /** + * 这段代码的作用是创建一个RabbitMQ消息转换器,用于将JSON格式的消息转换为Java对象。 + * @param mapper JSON + * @return 返回 + */ @Bean public MessageConverter messageConverter(ObjectMapper mapper){ return new Jackson2JsonMessageConverter(mapper); @@ -92,11 +126,21 @@ public class MqConfig implements EnvironmentAware{ return new DirectExchange(ERROR_EXCHANGE); } + /** + * 默认队列 + * @return 队列 + */ @Bean public Queue errorQueue(){ return new Queue(defaultErrorQueue, true); } + /** + * 异常队列 交换机 绑定 + * @param errorQueue 异常队列 + * @param errorMessageExchange 异常交换机 + * @return 返回绑定对象 + */ @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey); @@ -105,7 +149,9 @@ public class MqConfig implements EnvironmentAware{ @Override public void setEnvironment(Environment environment) { String appName = environment.getProperty("spring.application.name"); + // 设置错误队列的路由键 error. applicationName this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName; + // 设置错误队列的队列名 error. applicationName .queue this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName); } } diff --git a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/RabbitMqHelper.java b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/RabbitMqHelper.java index 62ea2c5..b8c180e 100644 --- a/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/RabbitMqHelper.java +++ b/tianji-master/tj-common/src/main/java/com/tianji/common/autoconfigure/mq/RabbitMqHelper.java @@ -14,6 +14,9 @@ import java.util.concurrent.ThreadPoolExecutor; import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER; +/** + * @author 夜灬瞬 + */ @Slf4j public class RabbitMqHelper { @@ -21,6 +24,14 @@ public class RabbitMqHelper { private final MessagePostProcessor processor = new BasicIdMessageProcessor(); private final ThreadPoolTaskExecutor executor; + /** + * 用于创建一个RabbitMQ帮助类对象 + * 在构造函数中 + * 创建了一个ThreadPoolTaskExecutor对象 + * 并配置了该对象的一些属性 + * 用于异步发送RabbitMQ消息。 + * @param rabbitTemplate rabbitTemplate + */ public RabbitMqHelper(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; executor = new ThreadPoolTaskExecutor(); @@ -47,6 +58,7 @@ public class RabbitMqHelper { log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey,t); // 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理 String id = UUID.randomUUID().toString(true); + // 使用UUID生成一个唯一的消息标识符,并将其设置为CorrelationData对象 CorrelationData correlationData = new CorrelationData(id); // 2.设置发送超时时间为500毫秒 rabbitTemplate.setReplyTimeout(500); @@ -60,6 +72,7 @@ public class RabbitMqHelper { public void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) { // 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理 String id = UUID.randomUUID().toString(true); + // 使用UUID生成一个唯一的消息标识符,并将其设置为CorrelationData对象 CorrelationData correlationData = new CorrelationData(id); // 2.设置发送超时时间为500毫秒 rabbitTemplate.setReplyTimeout(500);