fix: 1.rabbitmq发送的消息无法处理2.规范routingKey和队列名称3.send和recall使用routingKey来做区分

pull/75/head
larry 2 weeks ago
parent 2b334023cd
commit 5095319814

@ -29,34 +29,39 @@ import java.util.List;
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ)
public class RabbitMqReceiver implements MessageReceiver {
private static final String MSG_TYPE_SEND = "send";
private static final String MSG_TYPE_RECALL = "recall";
@Autowired
private ConsumeService consumeService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"),
value = @Queue(value = "${spring.rabbitmq.queues.send}", durable = "true"),
exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC),
key = "${austin.rabbitmq.routing.key}"
key = "${austin.rabbitmq.routing.send}"
))
public void onMessage(Message message) {
String messageType = message.getMessageProperties().getHeader("messageType");
public void send(Message message) {
byte[] body = message.getBody();
String messageContent = new String(body, StandardCharsets.UTF_8);
if (StringUtils.isBlank(messageContent)) {
return;
}
if (MSG_TYPE_SEND.equals(messageType)) {
// 处理发送消息
List<TaskInfo> taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class);
consumeService.consume2Send(taskInfoLists);
} else if (MSG_TYPE_RECALL.equals(messageType)) {
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queues.recall}", durable = "true"),
exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC),
key = "${austin.rabbitmq.routing.recall}"
))
public void recall(Message message) {
byte[] body = message.getBody();
String messageContent = new String(body, StandardCharsets.UTF_8);
if (StringUtils.isBlank(messageContent)) {
return;
}
// 处理撤回消息
RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class);
consumeService.consume2recall(recallTaskInfo);
}
}
}

@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* @author xzcawl
* @Date 2022/7/15 17:29
@ -22,19 +23,29 @@ public class RabbitSendMqServiceImpl implements SendMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${austin.rabbitmq.topic.name}")
private String confTopic;
@Value("${austin.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Value("${austin.business.recall.topic.name}")
private String austinRecall;
@Value("${austin.rabbitmq.routing.send}")
private String sendRoutingKey;
@Value("${austin.rabbitmq.routing.recall}")
private String recallRoutingKey;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (topic.equals(confTopic)) {
rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue);
} else {
log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic);
if (topic.equals(sendMessageTopic)){
rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue);
}else if (topic.equals(austinRecall)){
rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue);
}else {
log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic);
}
}

@ -65,10 +65,11 @@ spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/
austin.rabbitmq.topic.name=austinRabbit
austin.rabbitmq.exchange.name=austin.point
spring.rabbitmq.queues=austin_queues
austin.rabbitmq.routing.key=austin_KEY
spring.rabbitmq.queues.send=austin.queues.send
spring.rabbitmq.queues.recall=austin.queues.recall
austin.rabbitmq.routing.send=austin.send
austin.rabbitmq.routing.recall=austin.recall
########################################## RabbitMq end ##########################################

Loading…
Cancel
Save