Merge pull request #75 from cuitlarry/larry-develop

修复RabbitMQ无法处理消息错误,增强RabbitMQ可靠性
master
Java3y 9 months ago committed by GitHub
commit bd75dd72d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -29,34 +29,39 @@ import java.util.List;
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ)
public class RabbitMqReceiver implements MessageReceiver {
private static final String MSG_TYPE_SEND = "send";
private static final String MSG_TYPE_RECALL = "recall";
@Autowired
private ConsumeService consumeService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"),
value = @Queue(value = "${spring.rabbitmq.queues.send}", durable = "true"),
exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC),
key = "${austin.rabbitmq.routing.key}"
key = "${austin.rabbitmq.routing.send}"
))
public void onMessage(Message message) {
String messageType = message.getMessageProperties().getHeader("messageType");
public void send(Message message) {
byte[] body = message.getBody();
String messageContent = new String(body, StandardCharsets.UTF_8);
if (StringUtils.isBlank(messageContent)) {
return;
}
if (MSG_TYPE_SEND.equals(messageType)) {
// 处理发送消息
List<TaskInfo> taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class);
consumeService.consume2Send(taskInfoLists);
} else if (MSG_TYPE_RECALL.equals(messageType)) {
// 处理撤回消息
RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class);
consumeService.consume2recall(recallTaskInfo);
}
// 处理发送消息
List<TaskInfo> taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class);
consumeService.consume2Send(taskInfoLists);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.queues.recall}", durable = "true"),
exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC),
key = "${austin.rabbitmq.routing.recall}"
))
public void recall(Message message) {
byte[] body = message.getBody();
String messageContent = new String(body, StandardCharsets.UTF_8);
if (StringUtils.isBlank(messageContent)) {
return;
}
// 处理撤回消息
RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class);
consumeService.consume2recall(recallTaskInfo);
}
}

@ -0,0 +1,28 @@
package com.java3y.austin.support.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMqConfig implements ApplicationContextAware {
/**
* ReturnCallBack
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(returnedMessage ->
log.error("消息投递到队列失败, 状态码:{},失败原因:{},交换机:{}routingKey{},消息:{}",
returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), returnedMessage.getMessage()));
}
}

@ -1,8 +1,11 @@
package com.java3y.austin.support.mq.rabbit;
import cn.hutool.core.util.IdUtil;
import com.google.common.base.Throwables;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -10,6 +13,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* @author xzcawl
* @Date 2022/7/15 17:29
@ -22,19 +26,39 @@ public class RabbitSendMqServiceImpl implements SendMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${austin.rabbitmq.topic.name}")
private String confTopic;
@Value("${austin.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Value("${austin.business.recall.topic.name}")
private String austinRecall;
@Value("${austin.rabbitmq.routing.send}")
private String sendRoutingKey;
@Value("${austin.rabbitmq.routing.recall}")
private String recallRoutingKey;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (topic.equals(confTopic)) {
rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue);
} else {
log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic);
CorrelationData correlationData = new CorrelationData(IdUtil.getSnowflake().nextIdStr());
correlationData.getFuture().addCallback(result -> {
if (result.isAck()) {
log.info("消息成功投递到交换机消息ID{}", correlationData.getId());
}else{
log.error("消息投递到交换机失败消息ID{}", correlationData.getId());
}
}, ex -> {
log.error("消息处理异常,{}", Throwables.getStackTraceAsString(ex));
});
if (topic.equals(sendMessageTopic)){
rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue, correlationData);
}else if (topic.equals(austinRecall)){
rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue, correlationData);
}else {
log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic);
}
}

@ -65,10 +65,16 @@ spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/
austin.rabbitmq.topic.name=austinRabbit
#enable retry
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.multiplier=2
spring.rabbitmq.listener.simple.retry.max-attempts=3
austin.rabbitmq.exchange.name=austin.point
spring.rabbitmq.queues=austin_queues
austin.rabbitmq.routing.key=austin_KEY
spring.rabbitmq.queues.send=austin.queues.send
spring.rabbitmq.queues.recall=austin.queues.recall
austin.rabbitmq.routing.send=austin.send
austin.rabbitmq.routing.recall=austin.recall
########################################## RabbitMq end ##########################################

Loading…
Cancel
Save