死信队列 实现消息延迟消费 订单延迟取消

main
夜灬瞬 2 years ago
parent 9ffc84f5dc
commit 014eedc4e1

@ -18,4 +18,11 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,52 @@
package com.shun.order.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author
* @since 2023/6/24 17:31
*/
@Configuration
@Slf4j
public class RabbitMQConfig {
public static final String ORDER_EXCHANGE = "order_exchange";
public static final String ORDER_QUEUE = "order_queue";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_QUEUE = "dead_queue";
@Bean
public Exchange orderExchange(){
return ExchangeBuilder.fanoutExchange(ORDER_EXCHANGE).build();
}
@Bean
public Queue orderQueue(){
return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE).build();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.fanoutExchange(DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding orderBinding(Exchange orderExchange,Queue orderQueue){
return BindingBuilder.bind(orderQueue).to(orderExchange).with("").noargs();
}
@Bean
public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs();
}
}

@ -1,8 +1,16 @@
package com.shun.order.controller;
import com.shun.order.config.RabbitMQConfig;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @author
* @since 2023/6/19 15:54
@ -10,6 +18,9 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Resource
RabbitTemplate rabbitTemplate;
/**
*
* @throws InterruptedException
@ -18,6 +29,24 @@ public class OrderController {
public void create() throws InterruptedException {
Thread.sleep(400);
System.out.println("创建订单成功!");
save();
}
public void save() {
// 生成主键ID
String id = UUID.randomUUID().toString();
// 创建订单
//orderMapper.save(id);
// 订单构建成功~
// 发送消息到RabbitMQ的死信队列
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "", id, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息的生存时间为15s当然也可以在构建队列时指定队列的生存时间。
message.getMessageProperties().setExpiration("15000");
return message;
}
});
}
}

@ -0,0 +1,42 @@
package com.shun.order.listener;
import com.rabbitmq.client.Channel;
import com.shun.order.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
*
*
* @author
* @since 2023/6/21 21:43
*/
@Component
@Slf4j
public class OrderListener {
@RabbitListener(queues = {RabbitMQConfig.DEAD_QUEUE})
public void consume(String msg, Channel channel, Message message) throws Exception {
//1、 调用Service实现订单状态的处理
// orderService.delayCancelOrder(id);
//2、 ack的干活~
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
public void delayCancelOrder(String id) {
//1、基于id查询订单信息。 for update
// int orderState = orderMapper.findOrderStateByIdForUpdate(id);
// //2、判断订单状态
// if(orderState != 0){
// log.info("订单已经支付!!");
// return;
// }
// //3、修改订单状态
// log.info("订单未支付,修改订单状态为已取消");
// orderMapper.updateOrderStateById(-1,id);
}
}

@ -6,4 +6,10 @@ spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
server-addr: 127.0.0.1:8848
rabbitmq:
host: 192.168.48.128
port: 5672
username: admin
password: admin
virtual-host: /
Loading…
Cancel
Save