diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java index ab56379..74e4cb6 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java @@ -29,34 +29,39 @@ import java.util.List; @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) public class RabbitMqReceiver implements MessageReceiver { - private static final String MSG_TYPE_SEND = "send"; - private static final String MSG_TYPE_RECALL = "recall"; - @Autowired private ConsumeService consumeService; @RabbitListener(bindings = @QueueBinding( - value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"), + value = @Queue(value = "${spring.rabbitmq.queues.send}", durable = "true"), exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), - key = "${austin.rabbitmq.routing.key}" + key = "${austin.rabbitmq.routing.send}" )) - public void onMessage(Message message) { - String messageType = message.getMessageProperties().getHeader("messageType"); + public void send(Message message) { byte[] body = message.getBody(); String messageContent = new String(body, StandardCharsets.UTF_8); if (StringUtils.isBlank(messageContent)) { return; } - if (MSG_TYPE_SEND.equals(messageType)) { - // 处理发送消息 - List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); - consumeService.consume2Send(taskInfoLists); - } else if (MSG_TYPE_RECALL.equals(messageType)) { - // 处理撤回消息 - RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); - consumeService.consume2recall(recallTaskInfo); - } + // 处理发送消息 + List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); + consumeService.consume2Send(taskInfoLists); + } + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "${spring.rabbitmq.queues.recall}", durable = "true"), + exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), + key = "${austin.rabbitmq.routing.recall}" + )) + public void recall(Message message) { + byte[] body = message.getBody(); + String messageContent = new String(body, StandardCharsets.UTF_8); + if (StringUtils.isBlank(messageContent)) { + return; + } + // 处理撤回消息 + RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java b/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java new file mode 100644 index 0000000..8a04da4 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/config/RabbitMqConfig.java @@ -0,0 +1,28 @@ +package com.java3y.austin.support.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class RabbitMqConfig implements ApplicationContextAware { + + /** + * 添加ReturnCallBack + * 投递到队列失败的消息,可以在这里查看原因和做失败处理策略 + * @param applicationContext + * @throws BeansException + */ + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); + rabbitTemplate.setReturnsCallback(returnedMessage -> + log.error("消息投递到队列失败, 状态码:{},失败原因:{},交换机:{},routingKey:{},消息:{}", + returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), + returnedMessage.getRoutingKey(), returnedMessage.getMessage())); + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java index 71fafc3..d827f3f 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -1,8 +1,11 @@ package com.java3y.austin.support.mq.rabbit; +import cn.hutool.core.util.IdUtil; +import com.google.common.base.Throwables; import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.mq.SendMqService; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -10,6 +13,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; + /** * @author xzcawl * @Date 2022/7/15 17:29 @@ -22,19 +26,39 @@ public class RabbitSendMqServiceImpl implements SendMqService { @Autowired private RabbitTemplate rabbitTemplate; - @Value("${austin.rabbitmq.topic.name}") - private String confTopic; - @Value("${austin.rabbitmq.exchange.name}") private String exchangeName; + @Value("${austin.business.topic.name}") + private String sendMessageTopic; + + @Value("${austin.business.recall.topic.name}") + private String austinRecall; + + @Value("${austin.rabbitmq.routing.send}") + private String sendRoutingKey; + + @Value("${austin.rabbitmq.routing.recall}") + private String recallRoutingKey; @Override public void send(String topic, String jsonValue, String tagId) { - if (topic.equals(confTopic)) { - rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue); - } else { - log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic); + CorrelationData correlationData = new CorrelationData(IdUtil.getSnowflake().nextIdStr()); + correlationData.getFuture().addCallback(result -> { + if (result.isAck()) { + log.info("消息成功投递到交换机,消息ID:{}", correlationData.getId()); + }else{ + log.error("消息投递到交换机失败,消息ID:{}", correlationData.getId()); + } + }, ex -> { + log.error("消息处理异常,{}", Throwables.getStackTraceAsString(ex)); + }); + if (topic.equals(sendMessageTopic)){ + rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue, correlationData); + }else if (topic.equals(austinRecall)){ + rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue, correlationData); + }else { + log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index ba937f0..916782f 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -65,10 +65,16 @@ spring.rabbitmq.password=123456 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/ -austin.rabbitmq.topic.name=austinRabbit +#enable retry +spring.rabbitmq.listener.simple.retry.enabled=true +spring.rabbitmq.listener.simple.retry.initial-interval=1000 +spring.rabbitmq.listener.simple.retry.multiplier=2 +spring.rabbitmq.listener.simple.retry.max-attempts=3 austin.rabbitmq.exchange.name=austin.point -spring.rabbitmq.queues=austin_queues -austin.rabbitmq.routing.key=austin_KEY +spring.rabbitmq.queues.send=austin.queues.send +spring.rabbitmq.queues.recall=austin.queues.recall +austin.rabbitmq.routing.send=austin.send +austin.rabbitmq.routing.recall=austin.recall ########################################## RabbitMq end ##########################################