diff --git a/01-placeorder/src/main/java/com/mashibing/config/RabbitTemplateConfig.java b/01-placeorder/src/main/java/com/mashibing/config/RabbitTemplateConfig.java index 3a3a211..4f17a6d 100644 --- a/01-placeorder/src/main/java/com/mashibing/config/RabbitTemplateConfig.java +++ b/01-placeorder/src/main/java/com/mashibing/config/RabbitTemplateConfig.java @@ -7,7 +7,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/05-userpoints/pom.xml b/05-userpoints/pom.xml index 5177eec..5facb0d 100644 --- a/05-userpoints/pom.xml +++ b/05-userpoints/pom.xml @@ -25,6 +25,20 @@ org.springframework.boot spring-boot-starter-amqp + + mysql + mysql-connector-java + 5.1.47 + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.2.2 + + + org.projectlombok + lombok + \ No newline at end of file diff --git a/05-userpoints/src/main/java/com/mashibing/UserPointsStarterApp.java b/05-userpoints/src/main/java/com/mashibing/UserPointsStarterApp.java index ef9cb35..7ce82bc 100644 --- a/05-userpoints/src/main/java/com/mashibing/UserPointsStarterApp.java +++ b/05-userpoints/src/main/java/com/mashibing/UserPointsStarterApp.java @@ -1,5 +1,6 @@ package com.mashibing; +import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @@ -10,6 +11,7 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient; */ @SpringBootApplication @EnableDiscoveryClient +@MapperScan(basePackages = "com.mashibing.mapper") public class UserPointsStarterApp { public static void main(String[] args) { diff --git a/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java b/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java index 34b92ca..fe5ae89 100644 --- a/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java +++ b/05-userpoints/src/main/java/com/mashibing/listener/UserPointsListener.java @@ -1,9 +1,11 @@ package com.mashibing.listener; import com.mashibing.config.RabbitMQConfig; +import com.mashibing.service.UserPointsConsume; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; @@ -15,11 +17,13 @@ import java.io.IOException; @Component public class UserPointsListener { + @Autowired + private UserPointsConsume userPointsConsume; + @RabbitListener(queues = {RabbitMQConfig.USER_POINTS_QUEUE}) public void consume(String msg, Channel channel, Message message) throws InterruptedException, IOException { - // 预扣除用户积分 - Thread.sleep(400); - System.out.println("扣除用户积分成功!!"); + // 消费消息~ + userPointsConsume.consume(message); // 手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } diff --git a/05-userpoints/src/main/java/com/mashibing/mapper/UserPointsIdempotentMapper.java b/05-userpoints/src/main/java/com/mashibing/mapper/UserPointsIdempotentMapper.java new file mode 100644 index 0000000..c9ec16a --- /dev/null +++ b/05-userpoints/src/main/java/com/mashibing/mapper/UserPointsIdempotentMapper.java @@ -0,0 +1,19 @@ +package com.mashibing.mapper; + +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +/** + * @author zjw + * @description + */ +public interface UserPointsIdempotentMapper { + + @Select("select count(1) from user_points_idempotent where id = #{id}") + int findById(@Param("id") String id); + + @Insert("insert into user_points_idempotent (id) values (#{id})") + void save(@Param("id") String id); + +} diff --git a/05-userpoints/src/main/java/com/mashibing/service/UserPointsConsume.java b/05-userpoints/src/main/java/com/mashibing/service/UserPointsConsume.java new file mode 100644 index 0000000..81436db --- /dev/null +++ b/05-userpoints/src/main/java/com/mashibing/service/UserPointsConsume.java @@ -0,0 +1,17 @@ +package com.mashibing.service; + +import org.springframework.amqp.core.Message; + +/** + * @author zjw + * @description + */ +public interface UserPointsConsume { + + /** + * 消费消息的逻辑 + * @param message + */ + void consume(Message message); + +} diff --git a/05-userpoints/src/main/java/com/mashibing/service/impl/UserPointsConsumeImpl.java b/05-userpoints/src/main/java/com/mashibing/service/impl/UserPointsConsumeImpl.java new file mode 100644 index 0000000..c377375 --- /dev/null +++ b/05-userpoints/src/main/java/com/mashibing/service/impl/UserPointsConsumeImpl.java @@ -0,0 +1,49 @@ +package com.mashibing.service.impl; + +import com.mashibing.mapper.UserPointsIdempotentMapper; +import com.mashibing.service.UserPointsConsume; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; + +/** + * @author zjw + * @description + */ +@Service +@Slf4j +public class UserPointsConsumeImpl implements UserPointsConsume { + + @Resource + private UserPointsIdempotentMapper userPointsIdempotentMapper; + + private final String ID_NAME = "spring_returned_message_correlation"; + + + @Override + @Transactional + public void consume(Message message) { + // 获取生产者提供的CorrelationId要基于header去获取。 + String id = message.getMessageProperties().getHeader(ID_NAME); + //1、查询幂等表是否存在当前消息标识 + int count = userPointsIdempotentMapper.findById(id); + //2、如果存在,直接return结束 + if(count == 1){ + log.info("消息已经被消费!!!无需重复消费!"); + return; + } + //3、如果不存在,插入消息标识到幂等表 + userPointsIdempotentMapper.save(id); + //4、执行消费逻辑 + // 预扣除用户积分 + try { + Thread.sleep(400); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("扣除用户积分成功!!"); + } +} diff --git a/05-userpoints/src/main/resources/application.yml b/05-userpoints/src/main/resources/application.yml index ec34654..5bc4322 100644 --- a/05-userpoints/src/main/resources/application.yml +++ b/05-userpoints/src/main/resources/application.yml @@ -16,4 +16,9 @@ spring: virtual-host: rabbitmq listener: simple: - acknowledge-mode: manual \ No newline at end of file + acknowledge-mode: manual + datasource: + driver-class-name: org.gjt.mm.mysql.Driver + url: jdbc:mysql:///rabbitmq + username: root + password: root \ No newline at end of file