订单状态的延迟处理

master
Administrator 3 years ago
parent a0fca4c050
commit 7d1e03540a

@ -20,6 +20,24 @@
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>

@ -1,5 +1,6 @@
package com.mashibing;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@ -10,6 +11,7 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
*/
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan(basePackages = "com.mashibing.mapper")
public class OrderManageStarterApp {
public static void main(String[] args) {

@ -0,0 +1,50 @@
package com.mashibing.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* &&
* @author zjw
* @description
*/
@Configuration
public class RabbitMQConfig {
public static final String ORDER_EXCHANGE = "order_exchange";
public static final String ORDER_QUEUE = "order_queue";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_QUEUE = "dead_queue";
@Bean
public Exchange orderExchange(){
return ExchangeBuilder.fanoutExchange(ORDER_EXCHANGE).build();
}
@Bean
public Queue orderQueue(){
return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE).build();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.fanoutExchange(DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding orderBinding(Exchange orderExchange,Queue orderQueue){
return BindingBuilder.bind(orderQueue).to(orderExchange).with("").noargs();
}
@Bean
public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs();
}
}

@ -1,5 +1,8 @@
package com.mashibing.controller;
import com.mashibing.service.TBOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@ -8,12 +11,16 @@ import org.springframework.web.bind.annotation.RestController;
* @description
*/
@RestController
@Slf4j
public class OrderManageController {
@Autowired
private TBOrderService orderService;
@GetMapping("create")
public void create() throws InterruptedException {
Thread.sleep(400);
System.out.println("创建订单成功");
orderService.save();
log.info("创建订单成功!");
}
}

@ -0,0 +1,32 @@
package com.mashibing.listener;
import com.mashibing.config.RabbitMQConfig;
import com.mashibing.service.TBOrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author zjw
* @description
*/
@Component
public class DelayMessageListener {
@Autowired
private TBOrderService orderService;
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
public void consume(String id, Channel channel, Message message) throws IOException {
//1、 调用Service实现订单状态的处理
orderService.delayCancelOrder(id);
//2、 ack的干活~
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

@ -0,0 +1,24 @@
package com.mashibing.mapper;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.math.BigDecimal;
/**
* @author zjw
* @description
*/
public interface TBOrderMapper {
@Insert("insert into tb_order (id) values (#{id})")
void save(@Param("id") String id);
@Select("select order_state from tb_order where id = #{id} for update")
int findOrderStateByIdForUpdate(@Param("id") String id);
@Update("update tb_order set order_state = #{orderState} where id = #{id}")
void updateOrderStateById(@Param("orderState") int i, @Param("id") String id);
}

@ -0,0 +1,12 @@
package com.mashibing.service;
/**
* @author zjw
* @description
*/
public interface TBOrderService {
void save();
void delayCancelOrder(String id);
}

@ -0,0 +1,66 @@
package com.mashibing.service.impl;
import com.mashibing.config.RabbitMQConfig;
import com.mashibing.mapper.TBOrderMapper;
import com.mashibing.service.TBOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author zjw
* @description
*/
@Service
@Slf4j
public class TBOrderServiceImpl implements TBOrderService {
@Resource
private TBOrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
@Transactional
public void save() {
// 生成主键ID
String id = UUID.randomUUID().toString();
// 创建订单
orderMapper.save(id);
// 订单构建成功~
// 发送消息到RabbitMQ的死信队列
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "", id, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息的生存时间为15s当然也可以在构建队列时指定队列的生存时间。
message.getMessageProperties().setExpiration("15000");
return message;
}
});
}
@Override
@Transactional
public void delayCancelOrder(String id) {
//1、基于id查询订单信息。 for update
int orderState = orderMapper.findOrderStateByIdForUpdate(id);
//2、判断订单状态
if(orderState != 0){
log.info("订单已经支付!!");
return;
}
//3、修改订单状态
log.info("订单未支付,修改订单状态为已取消");
orderMapper.updateOrderStateById(-1,id);
}
}

@ -8,3 +8,17 @@ spring:
nacos:
discovery:
server-addr: 114.116.226.76:8848
datasource:
driver-class-name: org.gjt.mm.mysql.Driver
url: jdbc:mysql:///rabbitmq
username: root
password: root
rabbitmq:
host: 114.116.226.76
port: 5672
username: rabbitmq
password: rabbitmq
virtual-host: rabbitmq
listener:
simple:
acknowledge-mode: manual
Loading…
Cancel
Save