diff --git a/03-ordermanage/pom.xml b/03-ordermanage/pom.xml index 2d9a993..1433d40 100644 --- a/03-ordermanage/pom.xml +++ b/03-ordermanage/pom.xml @@ -20,6 +20,24 @@ com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery + + mysql + mysql-connector-java + 5.1.47 + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.2.2 + + + org.projectlombok + lombok + + + org.springframework.boot + spring-boot-starter-amqp + \ No newline at end of file diff --git a/03-ordermanage/src/main/java/com/mashibing/OrderManageStarterApp.java b/03-ordermanage/src/main/java/com/mashibing/OrderManageStarterApp.java index 80c021a..61f98db 100644 --- a/03-ordermanage/src/main/java/com/mashibing/OrderManageStarterApp.java +++ b/03-ordermanage/src/main/java/com/mashibing/OrderManageStarterApp.java @@ -1,5 +1,6 @@ package com.mashibing; +import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @@ -10,6 +11,7 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient; */ @SpringBootApplication @EnableDiscoveryClient +@MapperScan(basePackages = "com.mashibing.mapper") public class OrderManageStarterApp { public static void main(String[] args) { diff --git a/03-ordermanage/src/main/java/com/mashibing/config/RabbitMQConfig.java b/03-ordermanage/src/main/java/com/mashibing/config/RabbitMQConfig.java new file mode 100644 index 0000000..ce902bd --- /dev/null +++ b/03-ordermanage/src/main/java/com/mashibing/config/RabbitMQConfig.java @@ -0,0 +1,50 @@ +package com.mashibing.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 构建普通交换机&队列以及绑定上的死信交换机&队列 + * @author zjw + * @description + */ +@Configuration +public class RabbitMQConfig { + + public static final String ORDER_EXCHANGE = "order_exchange"; + public static final String ORDER_QUEUE = "order_queue"; + + public static final String DEAD_EXCHANGE = "dead_exchange"; + public static final String DEAD_QUEUE = "dead_queue"; + + @Bean + public Exchange orderExchange(){ + return ExchangeBuilder.fanoutExchange(ORDER_EXCHANGE).build(); + } + + @Bean + public Queue orderQueue(){ + return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE).build(); + } + + @Bean + public Exchange deadExchange(){ + return ExchangeBuilder.fanoutExchange(DEAD_EXCHANGE).build(); + } + + @Bean + public Queue deadQueue(){ + return QueueBuilder.durable(DEAD_QUEUE).build(); + } + + @Bean + public Binding orderBinding(Exchange orderExchange,Queue orderQueue){ + return BindingBuilder.bind(orderQueue).to(orderExchange).with("").noargs(); + } + + @Bean + public Binding deadBinding(Exchange deadExchange,Queue deadQueue){ + return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs(); + } +} diff --git a/03-ordermanage/src/main/java/com/mashibing/controller/OrderManageController.java b/03-ordermanage/src/main/java/com/mashibing/controller/OrderManageController.java index c8a7abc..7ecfbd5 100644 --- a/03-ordermanage/src/main/java/com/mashibing/controller/OrderManageController.java +++ b/03-ordermanage/src/main/java/com/mashibing/controller/OrderManageController.java @@ -1,5 +1,8 @@ package com.mashibing.controller; +import com.mashibing.service.TBOrderService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @@ -8,12 +11,16 @@ import org.springframework.web.bind.annotation.RestController; * @description */ @RestController +@Slf4j public class OrderManageController { + @Autowired + private TBOrderService orderService; + @GetMapping("create") public void create() throws InterruptedException { - Thread.sleep(400); - System.out.println("创建订单成功!"); + orderService.save(); + log.info("创建订单成功!!"); } } diff --git a/03-ordermanage/src/main/java/com/mashibing/listener/DelayMessageListener.java b/03-ordermanage/src/main/java/com/mashibing/listener/DelayMessageListener.java new file mode 100644 index 0000000..3fb8112 --- /dev/null +++ b/03-ordermanage/src/main/java/com/mashibing/listener/DelayMessageListener.java @@ -0,0 +1,32 @@ +package com.mashibing.listener; + +import com.mashibing.config.RabbitMQConfig; +import com.mashibing.service.TBOrderService; +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author zjw + * @description + */ +@Component +public class DelayMessageListener { + + @Autowired + private TBOrderService orderService; + + @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) + public void consume(String id, Channel channel, Message message) throws IOException { + //1、 调用Service实现订单状态的处理 + orderService.delayCancelOrder(id); + + //2、 ack的干活~ + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + +} diff --git a/03-ordermanage/src/main/java/com/mashibing/mapper/TBOrderMapper.java b/03-ordermanage/src/main/java/com/mashibing/mapper/TBOrderMapper.java new file mode 100644 index 0000000..97a29ca --- /dev/null +++ b/03-ordermanage/src/main/java/com/mashibing/mapper/TBOrderMapper.java @@ -0,0 +1,24 @@ +package com.mashibing.mapper; + +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +import java.math.BigDecimal; + +/** + * @author zjw + * @description + */ +public interface TBOrderMapper { + + @Insert("insert into tb_order (id) values (#{id})") + void save(@Param("id") String id); + + @Select("select order_state from tb_order where id = #{id} for update") + int findOrderStateByIdForUpdate(@Param("id") String id); + + @Update("update tb_order set order_state = #{orderState} where id = #{id}") + void updateOrderStateById(@Param("orderState") int i, @Param("id") String id); +} diff --git a/03-ordermanage/src/main/java/com/mashibing/service/TBOrderService.java b/03-ordermanage/src/main/java/com/mashibing/service/TBOrderService.java new file mode 100644 index 0000000..e2bc223 --- /dev/null +++ b/03-ordermanage/src/main/java/com/mashibing/service/TBOrderService.java @@ -0,0 +1,12 @@ +package com.mashibing.service; + +/** + * @author zjw + * @description + */ +public interface TBOrderService { + + void save(); + + void delayCancelOrder(String id); +} diff --git a/03-ordermanage/src/main/java/com/mashibing/service/impl/TBOrderServiceImpl.java b/03-ordermanage/src/main/java/com/mashibing/service/impl/TBOrderServiceImpl.java new file mode 100644 index 0000000..736fa1a --- /dev/null +++ b/03-ordermanage/src/main/java/com/mashibing/service/impl/TBOrderServiceImpl.java @@ -0,0 +1,66 @@ +package com.mashibing.service.impl; + +import com.mashibing.config.RabbitMQConfig; +import com.mashibing.mapper.TBOrderMapper; +import com.mashibing.service.TBOrderService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; +import java.util.UUID; + +/** + * @author zjw + * @description + */ +@Service +@Slf4j +public class TBOrderServiceImpl implements TBOrderService { + + @Resource + private TBOrderMapper orderMapper; + + @Autowired + private RabbitTemplate rabbitTemplate; + + + @Override + @Transactional + public void save() { + // 生成主键ID + String id = UUID.randomUUID().toString(); + // 创建订单 + orderMapper.save(id); + // 订单构建成功~ + // 发送消息到RabbitMQ的死信队列 + rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "", id, new MessagePostProcessor() { + @Override + public Message postProcessMessage(Message message) throws AmqpException { + // 设置消息的生存时间为15s,当然,也可以在构建队列时,指定队列的生存时间。 + message.getMessageProperties().setExpiration("15000"); + return message; + } + }); + } + + @Override + @Transactional + public void delayCancelOrder(String id) { + //1、基于id查询订单信息。 for update + int orderState = orderMapper.findOrderStateByIdForUpdate(id); + //2、判断订单状态 + if(orderState != 0){ + log.info("订单已经支付!!"); + return; + } + //3、修改订单状态 + log.info("订单未支付,修改订单状态为已取消"); + orderMapper.updateOrderStateById(-1,id); + } +} diff --git a/03-ordermanage/src/main/resources/application.yml b/03-ordermanage/src/main/resources/application.yml index 859473e..e0c130c 100644 --- a/03-ordermanage/src/main/resources/application.yml +++ b/03-ordermanage/src/main/resources/application.yml @@ -7,4 +7,18 @@ spring: cloud: nacos: discovery: - server-addr: 114.116.226.76:8848 \ No newline at end of file + server-addr: 114.116.226.76:8848 + datasource: + driver-class-name: org.gjt.mm.mysql.Driver + url: jdbc:mysql:///rabbitmq + username: root + password: root + rabbitmq: + host: 114.116.226.76 + port: 5672 + username: rabbitmq + password: rabbitmq + virtual-host: rabbitmq + listener: + simple: + acknowledge-mode: manual \ No newline at end of file