From 5095319814b3801e5edcf659d95bbd4c5414917e Mon Sep 17 00:00:00 2001 From: larry <945645265@qq.com> Date: Mon, 9 Dec 2024 23:56:12 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=201.rabbitmq=E5=8F=91=E9=80=81?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=E6=97=A0=E6=B3=95=E5=A4=84=E7=90=86?= =?UTF-8?q?2.=E8=A7=84=E8=8C=83routingKey=E5=92=8C=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=90=8D=E7=A7=B03.send=E5=92=8Crecall=E4=BD=BF=E7=94=A8routin?= =?UTF-8?q?gKey=E6=9D=A5=E5=81=9A=E5=8C=BA=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../receiver/rabbit/RabbitMqReceiver.java | 37 +++++++++++-------- .../mq/rabbit/RabbitSendMqServiceImpl.java | 25 +++++++++---- .../src/main/resources/application.properties | 7 ++-- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java index ab56379..74e4cb6 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java @@ -29,34 +29,39 @@ import java.util.List; @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) public class RabbitMqReceiver implements MessageReceiver { - private static final String MSG_TYPE_SEND = "send"; - private static final String MSG_TYPE_RECALL = "recall"; - @Autowired private ConsumeService consumeService; @RabbitListener(bindings = @QueueBinding( - value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"), + value = @Queue(value = "${spring.rabbitmq.queues.send}", durable = "true"), exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), - key = "${austin.rabbitmq.routing.key}" + key = "${austin.rabbitmq.routing.send}" )) - public void onMessage(Message message) { - String messageType = message.getMessageProperties().getHeader("messageType"); + public void send(Message message) { byte[] body = message.getBody(); String messageContent = new String(body, StandardCharsets.UTF_8); if (StringUtils.isBlank(messageContent)) { return; } - if (MSG_TYPE_SEND.equals(messageType)) { - // 处理发送消息 - List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); - consumeService.consume2Send(taskInfoLists); - } else if (MSG_TYPE_RECALL.equals(messageType)) { - // 处理撤回消息 - RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); - consumeService.consume2recall(recallTaskInfo); - } + // 处理发送消息 + List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); + consumeService.consume2Send(taskInfoLists); + } + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "${spring.rabbitmq.queues.recall}", durable = "true"), + exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), + key = "${austin.rabbitmq.routing.recall}" + )) + public void recall(Message message) { + byte[] body = message.getBody(); + String messageContent = new String(body, StandardCharsets.UTF_8); + if (StringUtils.isBlank(messageContent)) { + return; + } + // 处理撤回消息 + RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java index 71fafc3..ad2a1e4 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; + /** * @author xzcawl * @Date 2022/7/15 17:29 @@ -22,19 +23,29 @@ public class RabbitSendMqServiceImpl implements SendMqService { @Autowired private RabbitTemplate rabbitTemplate; - @Value("${austin.rabbitmq.topic.name}") - private String confTopic; - @Value("${austin.rabbitmq.exchange.name}") private String exchangeName; + @Value("${austin.business.topic.name}") + private String sendMessageTopic; + + @Value("${austin.business.recall.topic.name}") + private String austinRecall; + + @Value("${austin.rabbitmq.routing.send}") + private String sendRoutingKey; + + @Value("${austin.rabbitmq.routing.recall}") + private String recallRoutingKey; @Override public void send(String topic, String jsonValue, String tagId) { - if (topic.equals(confTopic)) { - rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue); - } else { - log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic); + if (topic.equals(sendMessageTopic)){ + rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue); + }else if (topic.equals(austinRecall)){ + rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue); + }else { + log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index ba937f0..e15e890 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -65,10 +65,11 @@ spring.rabbitmq.password=123456 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/ -austin.rabbitmq.topic.name=austinRabbit austin.rabbitmq.exchange.name=austin.point -spring.rabbitmq.queues=austin_queues -austin.rabbitmq.routing.key=austin_KEY +spring.rabbitmq.queues.send=austin.queues.send +spring.rabbitmq.queues.recall=austin.queues.recall +austin.rabbitmq.routing.send=austin.send +austin.rabbitmq.routing.recall=austin.recall ########################################## RabbitMq end ########################################## From 6605316a53d59b7af1bdf2933d37aafdcc9b5786 Mon Sep 17 00:00:00 2001 From: larry <945645265@qq.com> Date: Tue, 10 Dec 2024 00:11:04 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=E5=A2=9E=E5=BC=BArabbitmq=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E8=80=85=E6=B6=88=E6=81=AF=E5=8F=AF=E9=9D=A0=E6=80=A7?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0publisher-confirm=E5=92=8Cpublisher-?= =?UTF-8?q?return?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/support/config/RabbitMqConfig.java | 28 +++++++++++++++++++ .../mq/rabbit/RabbitSendMqServiceImpl.java | 17 +++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java b/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java new file mode 100644 index 0000000..8a04da4 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java @@ -0,0 +1,28 @@ +package com.java3y.austin.support.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class RabbitMqConfig implements ApplicationContextAware { + + /** + * 添加ReturnCallBack + * 投递到队列失败的消息,可以在这里查看原因和做失败处理策略 + * @param applicationContext + * @throws BeansException + */ + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); + rabbitTemplate.setReturnsCallback(returnedMessage -> + log.error("消息投递到队列失败, 状态码:{},失败原因:{},交换机:{},routingKey:{},消息:{}", + returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), + returnedMessage.getRoutingKey(), returnedMessage.getMessage())); + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java index ad2a1e4..d827f3f 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -1,8 +1,11 @@ package com.java3y.austin.support.mq.rabbit; +import cn.hutool.core.util.IdUtil; +import com.google.common.base.Throwables; import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.mq.SendMqService; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -40,10 +43,20 @@ public class RabbitSendMqServiceImpl implements SendMqService { @Override public void send(String topic, String jsonValue, String tagId) { + CorrelationData correlationData = new CorrelationData(IdUtil.getSnowflake().nextIdStr()); + correlationData.getFuture().addCallback(result -> { + if (result.isAck()) { + log.info("消息成功投递到交换机,消息ID:{}", correlationData.getId()); + }else{ + log.error("消息投递到交换机失败,消息ID:{}", correlationData.getId()); + } + }, ex -> { + log.error("消息处理异常,{}", Throwables.getStackTraceAsString(ex)); + }); if (topic.equals(sendMessageTopic)){ - rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue); + rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue, correlationData); }else if (topic.equals(austinRecall)){ - rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue); + rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue, correlationData); }else { log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic); } From ff53cf7aac67081a34c3624cff5b4afebef2e1ba Mon Sep 17 00:00:00 2001 From: larry <945645265@qq.com> Date: Tue, 10 Dec 2024 00:27:59 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=E5=BC=80=E5=90=AFretry=EF=BC=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E6=9C=AA=E5=A4=84=E7=90=86=E7=9A=84=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E8=AE=A9=E6=B6=88=E8=B4=B9=E8=80=85=E4=B8=80=E7=9B=B4?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E4=BB=8E=E8=80=8C=E9=98=BB=E5=A1=9E=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E6=B6=88=E8=B4=B9=E8=80=85=E7=9A=84=E5=8F=AF?= =?UTF-8?q?=E9=9D=A0=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- austin-web/src/main/resources/application.properties | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index e15e890..916782f 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -65,6 +65,11 @@ spring.rabbitmq.password=123456 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/ +#enable retry +spring.rabbitmq.listener.simple.retry.enabled=true +spring.rabbitmq.listener.simple.retry.initial-interval=1000 +spring.rabbitmq.listener.simple.retry.multiplier=2 +spring.rabbitmq.listener.simple.retry.max-attempts=3 austin.rabbitmq.exchange.name=austin.point spring.rabbitmq.queues.send=austin.queues.send spring.rabbitmq.queues.recall=austin.queues.recall