From 6605316a53d59b7af1bdf2933d37aafdcc9b5786 Mon Sep 17 00:00:00 2001 From: larry <945645265@qq.com> Date: Tue, 10 Dec 2024 00:11:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=A2=9E=E5=BC=BArabbitmq=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E8=80=85=E6=B6=88=E6=81=AF=E5=8F=AF=E9=9D=A0=E6=80=A7?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0publisher-confirm=E5=92=8Cpublisher-?= =?UTF-8?q?return?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/support/config/RabbitMqConfig.java | 28 +++++++++++++++++++ .../mq/rabbit/RabbitSendMqServiceImpl.java | 17 +++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java b/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java new file mode 100644 index 0000000..8a04da4 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java @@ -0,0 +1,28 @@ +package com.java3y.austin.support.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class RabbitMqConfig implements ApplicationContextAware { + + /** + * 添加ReturnCallBack + * 投递到队列失败的消息,可以在这里查看原因和做失败处理策略 + * @param applicationContext + * @throws BeansException + */ + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); + rabbitTemplate.setReturnsCallback(returnedMessage -> + log.error("消息投递到队列失败, 状态码:{},失败原因:{},交换机:{},routingKey:{},消息:{}", + returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), + returnedMessage.getRoutingKey(), returnedMessage.getMessage())); + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java index ad2a1e4..d827f3f 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -1,8 +1,11 @@ package com.java3y.austin.support.mq.rabbit; +import cn.hutool.core.util.IdUtil; +import com.google.common.base.Throwables; import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.mq.SendMqService; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -40,10 +43,20 @@ public class RabbitSendMqServiceImpl implements SendMqService { @Override public void send(String topic, String jsonValue, String tagId) { + CorrelationData correlationData = new CorrelationData(IdUtil.getSnowflake().nextIdStr()); + correlationData.getFuture().addCallback(result -> { + if (result.isAck()) { + log.info("消息成功投递到交换机,消息ID:{}", correlationData.getId()); + }else{ + log.error("消息投递到交换机失败,消息ID:{}", correlationData.getId()); + } + }, ex -> { + log.error("消息处理异常,{}", Throwables.getStackTraceAsString(ex)); + }); if (topic.equals(sendMessageTopic)){ - rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue); + rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue, correlationData); }else if (topic.equals(austinRecall)){ - rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue); + rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue, correlationData); }else { log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic); }