下单服务保证消息的可靠性

master
Administrator 3 years ago
parent becd6e50d8
commit 79d1a7c87c

@ -30,6 +30,20 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
</project>

@ -1,5 +1,6 @@
package com.mashibing;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@ -12,6 +13,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@MapperScan(basePackages = "com.mashibing.mapper")
public class PlaceOrderStarterApp {
public static void main(String[] args) {

@ -0,0 +1,76 @@
package com.mashibing.config;
import com.mashibing.mapper.ResendMapper;
import com.mashibing.util.GlobalCache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.Map;
/**
* @author zjw
* @description
*/
@Configuration
@Slf4j
public class RabbitTemplateConfig {
@Resource
private ResendMapper resendMapper;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
//1、new出RabbitTemplate对象
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//2、将connectionFactory设置到RabbitTemplate对象中
rabbitTemplate.setConnectionFactory(connectionFactory);
//3、设置confirm回调
rabbitTemplate.setConfirmCallback(confirmCallback());
//4、设置return回调
rabbitTemplate.setReturnCallback(returnCallback());
//5、设置mandatory为true
rabbitTemplate.setMandatory(true);
//6、返回RabbitTemplate对象即可
return rabbitTemplate;
}
public RabbitTemplate.ConfirmCallback confirmCallback(){
return new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlationData == null) return;
String msgId = correlationData.getId();
if(ack){
log.info("消息发送到Exchange成功!!");
GlobalCache.remove(msgId);
}else{
log.error("消息发送失败!");
Map value = (Map) GlobalCache.get(msgId);
// 推荐自己玩的时候用service做增删改操作控制事务~
resendMapper.save(value);
}
}
};
}
public RabbitTemplate.ReturnCallback returnCallback(){
return new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 做到这你可以基于confirm中的操作实现将当前消息也保存到数据库。
log.error("消息未路由到队列");
log.error("return消息为" + new String(message.getBody()));
log.error("return交换机为" + exchange);
log.error("return路由为" + routingKey);
}
};
}
}

@ -2,11 +2,19 @@ package com.mashibing.controller;
import com.mashibing.client.*;
import com.mashibing.config.RabbitMQConfig;
import com.mashibing.util.GlobalCache;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author zjw
* @description
@ -19,16 +27,6 @@ public class PlaceOrderController {
@Autowired
private OrderManageClient orderManageClient;
/*
@Autowired
private CouponClient couponClient;
@Autowired
private UserPointsClient userPointsClient;
@Autowired
private BusinessClient businessClient;
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
@ -44,17 +42,21 @@ public class PlaceOrderController {
orderManageClient.create();
// 将之前的同步方式注释
/*
//3、调用优惠券服务预扣除使用的优惠券
couponClient.coupon();
//4、调用用户积分服务预扣除用户使用的积分
userPointsClient.up();
//5、调用商家服务通知商家用户已下单
businessClient.notifyBusiness();
*/
String userAndOrderInfo = "用户信息&订单信息&优惠券信息等等…………";
// 声明当前消息的id标识
String id = UUID.randomUUID().toString();
// 封装消息的全部信息
Map map = new HashMap<>();
map.put("id",id);
map.put("message",userAndOrderInfo);
map.put("exchange",RabbitMQConfig.PLACE_ORDER_EXCHANGE);
map.put("routingKey","");
map.put("sendTime",new Date());
// 将id标识和消息存储到全局缓存中
GlobalCache.set(id,map);
// 将同步方式修改为基于RabbitMQ的异步方式
rabbitTemplate.convertAndSend(RabbitMQConfig.PLACE_ORDER_EXCHANGE,"",userAndOrderInfo);
rabbitTemplate.convertAndSend(RabbitMQConfig.PLACE_ORDER_EXCHANGE,"",userAndOrderInfo,new CorrelationData(id));
long end = System.currentTimeMillis();
System.out.println(end - start);

@ -0,0 +1,16 @@
package com.mashibing.mapper;
import org.apache.ibatis.annotations.Insert;
import java.util.Map;
/**
* @author zjw
* @description
*/
public interface ResendMapper {
@Insert("insert into resend (id,message,exchange,routing_key,send_time) values (#{id},#{message},#{exchange},#{routingKey},#{sendTime})")
void save(Map map);
}

@ -0,0 +1,27 @@
package com.mashibing.util;
import java.util.HashMap;
import java.util.Map;
/**
* @author zjw
* @description
*/
public class GlobalCache {
private static Map map = new HashMap();
public static void set(String key,Object value){
map.put(key,value);
}
public static Object get(String key){
Object value = map.get(key);
return value;
}
public static void remove(String key){
map.remove(key);
}
}

@ -13,4 +13,11 @@ spring:
port: 5672
username: rabbitmq
password: rabbitmq
virtual-host: rabbitmq
virtual-host: rabbitmq
publisher-confirm-type: correlated
publisher-returns: true
datasource:
driver-class-name: org.gjt.mm.mysql.Driver
url: jdbc:mysql:///rabbitmq
username: root
password: root
Loading…
Cancel
Save