feat:增强rabbitmq生产者消息可靠性,添加publisher-confirm和publisher-return

pull/75/head
larry 9 months ago
parent 5095319814
commit 6605316a53

@ -0,0 +1,28 @@
package com.java3y.austin.support.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMqConfig implements ApplicationContextAware {
/**
* ReturnCallBack
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(returnedMessage ->
log.error("消息投递到队列失败, 状态码:{},失败原因:{},交换机:{}routingKey{},消息:{}",
returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), returnedMessage.getMessage()));
}
}

@ -1,8 +1,11 @@
package com.java3y.austin.support.mq.rabbit;
import cn.hutool.core.util.IdUtil;
import com.google.common.base.Throwables;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -40,10 +43,20 @@ public class RabbitSendMqServiceImpl implements SendMqService {
@Override
public void send(String topic, String jsonValue, String tagId) {
CorrelationData correlationData = new CorrelationData(IdUtil.getSnowflake().nextIdStr());
correlationData.getFuture().addCallback(result -> {
if (result.isAck()) {
log.info("消息成功投递到交换机消息ID{}", correlationData.getId());
}else{
log.error("消息投递到交换机失败消息ID{}", correlationData.getId());
}
}, ex -> {
log.error("消息处理异常,{}", Throwables.getStackTraceAsString(ex));
});
if (topic.equals(sendMessageTopic)){
rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue);
rabbitTemplate.convertAndSend(exchangeName, sendRoutingKey, jsonValue, correlationData);
}else if (topic.equals(austinRecall)){
rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue);
rabbitTemplate.convertAndSend(exchangeName, recallRoutingKey, jsonValue, correlationData);
}else {
log.error("RabbitSendMqServiceImpl send topic error! topic:{}", topic);
}

Loading…
Cancel
Save