From 34b92f041128875bbe69daf3760cbc43751472f0 Mon Sep 17 00:00:00 2001 From: Administrator Date: Wed, 22 Mar 2023 22:02:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=BC=82=E6=AD=A5=E8=B0=83?= =?UTF-8?q?=E7=94=A8=EF=BC=88=E7=94=A8=E6=88=B7=E7=A7=AF=E5=88=86&?= =?UTF-8?q?=E5=95=86=E5=AE=B6=E6=9C=8D=E5=8A=A1=E6=B6=88=E8=B4=B9=E6=B6=88?= =?UTF-8?q?=E6=81=AF=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 05-userpoints/pom.xml | 4 ++ .../com/mashibing/config/RabbitMQConfig.java | 52 +++++++++++++++++++ .../listener/UserPointsListener.java | 27 ++++++++++ .../src/main/resources/application.yml | 11 +++- 06-business/pom.xml | 5 ++ .../com/mashibing/config/RabbitMQConfig.java | 52 +++++++++++++++++++ .../mashibing/listener/BusinessListener.java | 27 ++++++++++ .../src/main/resources/application.yml | 11 +++- 8 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 05-userpoints/src/main/java/com/mashibing/config/RabbitMQConfig.java create mode 100644 05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java create mode 100644 06-business/src/main/java/com/mashibing/config/RabbitMQConfig.java create mode 100644 06-business/src/main/java/com/mashibing/listener/BusinessListener.java diff --git a/05-userpoints/pom.xml b/05-userpoints/pom.xml index 564bb5a..5177eec 100644 --- a/05-userpoints/pom.xml +++ b/05-userpoints/pom.xml @@ -21,6 +21,10 @@ com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery + + org.springframework.boot + spring-boot-starter-amqp + \ No newline at end of file diff --git a/05-userpoints/src/main/java/com/mashibing/config/RabbitMQConfig.java b/05-userpoints/src/main/java/com/mashibing/config/RabbitMQConfig.java new file mode 100644 index 0000000..d71e9df --- /dev/null +++ b/05-userpoints/src/main/java/com/mashibing/config/RabbitMQConfig.java @@ -0,0 +1,52 @@ +package com.mashibing.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author zjw + * @description + */ +@Configuration +public class RabbitMQConfig { + + // 下单服务的交换机 + public static final String PLACE_ORDER_EXCHANGE = "place_order_exchange"; + // 三个服务的Queue + public static final String COUPON_QUEUE = "coupon_queue"; + public static final String USER_POINTS_QUEUE = "user_points_queue"; + public static final String BUSINESS_QUEUE = "business_queue"; + + + @Bean + public Exchange placeOrderExchange(){ + return ExchangeBuilder.fanoutExchange(PLACE_ORDER_EXCHANGE).build(); + } + + @Bean + public Queue couponQueue(){ + return QueueBuilder.durable(COUPON_QUEUE).build(); + } + @Bean + public Queue userPointsQueue(){ + return QueueBuilder.durable(USER_POINTS_QUEUE).build(); + } + @Bean + public Queue businessQueue(){ + return QueueBuilder.durable(BUSINESS_QUEUE).build(); + } + + @Bean + public Binding couponBinding(Exchange placeOrderExchange,Queue couponQueue){ + return BindingBuilder.bind(couponQueue).to(placeOrderExchange).with("").noargs(); + } + @Bean + public Binding userPointsBinding(Exchange placeOrderExchange,Queue userPointsQueue){ + return BindingBuilder.bind(userPointsQueue).to(placeOrderExchange).with("").noargs(); + } + @Bean + public Binding businessBinding(Exchange placeOrderExchange,Queue businessQueue){ + return BindingBuilder.bind(businessQueue).to(placeOrderExchange).with("").noargs(); + } +} diff --git a/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java b/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java new file mode 100644 index 0000000..ea12edb --- /dev/null +++ b/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java @@ -0,0 +1,27 @@ +package com.mashibing.listener; + +import com.mashibing.config.RabbitMQConfig; +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author zjw + * @description + */ +@Component +public class UserPointsListener { + + @RabbitListener(queues = {RabbitMQConfig.COUPON_QUEUE}) + public void consume(String msg, Channel channel, Message message) throws InterruptedException, IOException { + // 预扣除用户积分 + Thread.sleep(400); + System.out.println("扣除用户积分成功!!"); + // 手动ACK + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + +} diff --git a/05-userpoints/src/main/resources/application.yml b/05-userpoints/src/main/resources/application.yml index 0bd009d..ec34654 100644 --- a/05-userpoints/src/main/resources/application.yml +++ b/05-userpoints/src/main/resources/application.yml @@ -7,4 +7,13 @@ spring: cloud: nacos: discovery: - server-addr: 114.116.226.76:8848 \ No newline at end of file + server-addr: 114.116.226.76:8848 + rabbitmq: + host: 114.116.226.76 + port: 5672 + username: rabbitmq + password: rabbitmq + virtual-host: rabbitmq + listener: + simple: + acknowledge-mode: manual \ No newline at end of file diff --git a/06-business/pom.xml b/06-business/pom.xml index bebce64..a0e8a2c 100644 --- a/06-business/pom.xml +++ b/06-business/pom.xml @@ -22,5 +22,10 @@ com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery + + org.springframework.boot + spring-boot-starter-amqp + + \ No newline at end of file diff --git a/06-business/src/main/java/com/mashibing/config/RabbitMQConfig.java b/06-business/src/main/java/com/mashibing/config/RabbitMQConfig.java new file mode 100644 index 0000000..d71e9df --- /dev/null +++ b/06-business/src/main/java/com/mashibing/config/RabbitMQConfig.java @@ -0,0 +1,52 @@ +package com.mashibing.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author zjw + * @description + */ +@Configuration +public class RabbitMQConfig { + + // 下单服务的交换机 + public static final String PLACE_ORDER_EXCHANGE = "place_order_exchange"; + // 三个服务的Queue + public static final String COUPON_QUEUE = "coupon_queue"; + public static final String USER_POINTS_QUEUE = "user_points_queue"; + public static final String BUSINESS_QUEUE = "business_queue"; + + + @Bean + public Exchange placeOrderExchange(){ + return ExchangeBuilder.fanoutExchange(PLACE_ORDER_EXCHANGE).build(); + } + + @Bean + public Queue couponQueue(){ + return QueueBuilder.durable(COUPON_QUEUE).build(); + } + @Bean + public Queue userPointsQueue(){ + return QueueBuilder.durable(USER_POINTS_QUEUE).build(); + } + @Bean + public Queue businessQueue(){ + return QueueBuilder.durable(BUSINESS_QUEUE).build(); + } + + @Bean + public Binding couponBinding(Exchange placeOrderExchange,Queue couponQueue){ + return BindingBuilder.bind(couponQueue).to(placeOrderExchange).with("").noargs(); + } + @Bean + public Binding userPointsBinding(Exchange placeOrderExchange,Queue userPointsQueue){ + return BindingBuilder.bind(userPointsQueue).to(placeOrderExchange).with("").noargs(); + } + @Bean + public Binding businessBinding(Exchange placeOrderExchange,Queue businessQueue){ + return BindingBuilder.bind(businessQueue).to(placeOrderExchange).with("").noargs(); + } +} diff --git a/06-business/src/main/java/com/mashibing/listener/BusinessListener.java b/06-business/src/main/java/com/mashibing/listener/BusinessListener.java new file mode 100644 index 0000000..708d53a --- /dev/null +++ b/06-business/src/main/java/com/mashibing/listener/BusinessListener.java @@ -0,0 +1,27 @@ +package com.mashibing.listener; + +import com.mashibing.config.RabbitMQConfig; +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author zjw + * @description + */ +@Component +public class BusinessListener { + + @RabbitListener(queues = {RabbitMQConfig.COUPON_QUEUE}) + public void consume(String msg, Channel channel, Message message) throws InterruptedException, IOException { + // 通知商家成功! + Thread.sleep(400); + System.out.println("通知商家成功!!"); + // 手动ACK + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + +} diff --git a/06-business/src/main/resources/application.yml b/06-business/src/main/resources/application.yml index 4bb633f..e0ee56f 100644 --- a/06-business/src/main/resources/application.yml +++ b/06-business/src/main/resources/application.yml @@ -7,4 +7,13 @@ spring: cloud: nacos: discovery: - server-addr: 114.116.226.76:8848 \ No newline at end of file + server-addr: 114.116.226.76:8848 + rabbitmq: + host: 114.116.226.76 + port: 5672 + username: rabbitmq + password: rabbitmq + virtual-host: rabbitmq + listener: + simple: + acknowledge-mode: manual \ No newline at end of file