From 5095319814b3801e5edcf659d95bbd4c5414917e Mon Sep 17 00:00:00 2001 From: larry <945645265@qq.com> Date: Mon, 9 Dec 2024 23:56:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=201.rabbitmq=E5=8F=91=E9=80=81=E7=9A=84?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=97=A0=E6=B3=95=E5=A4=84=E7=90=862.?= =?UTF-8?q?=E8=A7=84=E8=8C=83routingKey=E5=92=8C=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=90=8D=E7=A7=B03.send=E5=92=8Crecall=E4=BD=BF=E7=94=A8routin?= =?UTF-8?q?gKey=E6=9D=A5=E5=81=9A=E5=8C=BA=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../receiver/rabbit/RabbitMqReceiver.java | 37 +++++++++++-------- .../mq/rabbit/RabbitSendMqServiceImpl.java | 25 +++++++++---- .../src/main/resources/application.properties | 7 ++-- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java index ab56379..74e4cb6 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java @@ -29,34 +29,39 @@ import java.util.List; @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) public class RabbitMqReceiver implements MessageReceiver { - private static final String MSG_TYPE_SEND = "send"; - private static final String MSG_TYPE_RECALL = "recall"; - @Autowired private ConsumeService consumeService; @RabbitListener(bindings = @QueueBinding( - value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"), + value = @Queue(value = "${spring.rabbitmq.queues.send}", durable = "true"), exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), - key = "${austin.rabbitmq.routing.key}" + key = "${austin.rabbitmq.routing.send}" )) - public void onMessage(Message message) { - String messageType = message.getMessageProperties().getHeader("messageType"); + public void send(Message message) { byte[] body = message.getBody(); String messageContent = new String(body, StandardCharsets.UTF_8); if (StringUtils.isBlank(messageContent)) { return; } - if (MSG_TYPE_SEND.equals(messageType)) { - // 处理发送消息 - List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); - consumeService.consume2Send(taskInfoLists); - } else if (MSG_TYPE_RECALL.equals(messageType)) { - // 处理撤回消息 - RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); - consumeService.consume2recall(recallTaskInfo); - } + // 处理发送消息 + List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); + consumeService.consume2Send(taskInfoLists); + } + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "${spring.rabbitmq.queues.recall}", durable = "true"), + exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), + key = "${austin.rabbitmq.routing.recall}" + )) + public void recall(Message message) { + byte[] body = message.getBody(); + String messageContent = new String(body, StandardCharsets.UTF_8); + if (StringUtils.isBlank(messageContent)) { + return; + } + // 处理撤回消息 + RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java index 71fafc3..ad2a1e4 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; + /** * @author xzcawl * @Date 2022/7/15 17:29 @@ -22,19 +23,29 @@ public class RabbitSendMqServiceImpl implements SendMqService { @Autowired private RabbitTemplate rabbitTemplate; - @Value("${austin.rabbitmq.topic.name}") - private String confTopic; - @Value("${austin.rabbitmq.exchange.name}") private String exchangeName; + @Value("${austin.business.topic.name}") + private String sendMessageTopic; + + @Value("${austin.business.recall.topic.name}") + private String austinRecall; + + @Value("${austin.rabbitmq.routing.send}") + private String sendRoutingKey; + + @Value("${austin.rabbitmq.routing.recall}") + private String recallRoutingKey; @Override public void send(String topic, String jsonValue, String tagId) { - if (topic.equals(confTopic)) { - rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue); - } else { - log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic); + if (topic.equals(sendMessageTopic)){ + rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue); + }else if (topic.equals(austinRecall)){ + rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue); + }else { + log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index ba937f0..e15e890 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -65,10 +65,11 @@ spring.rabbitmq.password=123456 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/ -austin.rabbitmq.topic.name=austinRabbit austin.rabbitmq.exchange.name=austin.point -spring.rabbitmq.queues=austin_queues -austin.rabbitmq.routing.key=austin_KEY +spring.rabbitmq.queues.send=austin.queues.send +spring.rabbitmq.queues.recall=austin.queues.recall +austin.rabbitmq.routing.send=austin.send +austin.rabbitmq.routing.recall=austin.recall ########################################## RabbitMq end ##########################################