消费者避免重复消费问题

master
Administrator 3 years ago
parent 79d1a7c87c
commit a0fca4c050

@ -7,7 +7,6 @@ import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@ -25,6 +25,20 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

@ -1,5 +1,6 @@
package com.mashibing;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@ -10,6 +11,7 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
*/
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan(basePackages = "com.mashibing.mapper")
public class UserPointsStarterApp {
public static void main(String[] args) {

@ -1,9 +1,11 @@
package com.mashibing.listener;
import com.mashibing.config.RabbitMQConfig;
import com.mashibing.service.UserPointsConsume;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@ -15,11 +17,13 @@ import java.io.IOException;
@Component
public class UserPointsListener {
@Autowired
private UserPointsConsume userPointsConsume;
@RabbitListener(queues = {RabbitMQConfig.USER_POINTS_QUEUE})
public void consume(String msg, Channel channel, Message message) throws InterruptedException, IOException {
// 预扣除用户积分
Thread.sleep(400);
System.out.println("扣除用户积分成功!!");
// 消费消息~
userPointsConsume.consume(message);
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

@ -0,0 +1,19 @@
package com.mashibing.mapper;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
/**
* @author zjw
* @description
*/
public interface UserPointsIdempotentMapper {
@Select("select count(1) from user_points_idempotent where id = #{id}")
int findById(@Param("id") String id);
@Insert("insert into user_points_idempotent (id) values (#{id})")
void save(@Param("id") String id);
}

@ -0,0 +1,17 @@
package com.mashibing.service;
import org.springframework.amqp.core.Message;
/**
* @author zjw
* @description
*/
public interface UserPointsConsume {
/**
*
* @param message
*/
void consume(Message message);
}

@ -0,0 +1,49 @@
package com.mashibing.service.impl;
import com.mashibing.mapper.UserPointsIdempotentMapper;
import com.mashibing.service.UserPointsConsume;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author zjw
* @description
*/
@Service
@Slf4j
public class UserPointsConsumeImpl implements UserPointsConsume {
@Resource
private UserPointsIdempotentMapper userPointsIdempotentMapper;
private final String ID_NAME = "spring_returned_message_correlation";
@Override
@Transactional
public void consume(Message message) {
// 获取生产者提供的CorrelationId要基于header去获取。
String id = message.getMessageProperties().getHeader(ID_NAME);
//1、查询幂等表是否存在当前消息标识
int count = userPointsIdempotentMapper.findById(id);
//2、如果存在直接return结束
if(count == 1){
log.info("消息已经被消费!!!无需重复消费!");
return;
}
//3、如果不存在插入消息标识到幂等表
userPointsIdempotentMapper.save(id);
//4、执行消费逻辑
// 预扣除用户积分
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("扣除用户积分成功!!");
}
}

@ -16,4 +16,9 @@ spring:
virtual-host: rabbitmq
listener:
simple:
acknowledge-mode: manual
acknowledge-mode: manual
datasource:
driver-class-name: org.gjt.mm.mysql.Driver
url: jdbc:mysql:///rabbitmq
username: root
password: root
Loading…
Cancel
Save