Pre Merge pull request !169 from 张洪威/master

pull/169/MERGE
张洪威 3 years ago committed by Gitee
commit 69e85dade1
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

@ -0,0 +1,72 @@
**<h2>使用MQ的场景和优点</h2>**
#### 解耦
看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢A 系统负责人几乎崩溃......<br>
![mq-1](https://gitee.com/shishan100/Java-Interview-Advanced/raw/master/images/mq-1.png)<br>
在这个场景中A 系统跟其它各种乱七八糟的系统严重耦合A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!<br>
如果使用 MQA 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。<br>
![mq-2](https://gitee.com/shishan100/Java-Interview-Advanced/raw/master/images/mq-2.png)<br>
**总结**:通过一个 MQPub/Sub 发布订阅消息这么一个模型A 系统就跟其它系统彻底解耦了。<br>
#### 异步
再来看一个场景A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3msBCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms接近 1s用户感觉搞个什么东西慢死了慢死了。用户通过浏览器发起请求等待个 1s这几乎是不可接受的。<br>
![mq-3](https://gitee.com/shishan100/Java-Interview-Advanced/raw/master/images/mq-3.png)<br>
一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。<br>
如果**使用 MQ**,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5msA 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms对于用户而言其实感觉上就是点个按钮8ms 以后就直接返回了,爽!网站做得真好,真快!<br>
![mq-4](https://gitee.com/shishan100/Java-Interview-Advanced/raw/master/images/mq-4.png)<br>
#### 削峰
每天 0:00 到 12:00A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL每秒钟对 MySQL 执行约 5k 条 SQL。<br>
一般的 MySQL扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。<br>
但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。<br>
![mq-5](https://gitee.com/shishan100/Java-Interview-Advanced/raw/master/images/mq-5.png)<br>
如果使用 MQ每秒 5k 个请求写入 MQA 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok这样下来哪怕是高峰期的时候A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去结果就导致在中午高峰期1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。<br>
![mq-6](https://gitee.com/shishan100/Java-Interview-Advanced/raw/master/images/mq-6.png)<br>
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说只要高峰期一过A 系统就会快速将积压的消息给解决掉。<br>
**<h2>RabbitMQ简介</h2>**
RabbitMQ是一个开源的消息代理的队列服务器用来通过普通协议在完全不同的应用之间共享数据。<br>
RabbitMQ是使用Erlang语言来编写的并且RabbitMQ是基于AMQP协议的。<br>
Erlang语言在数据交互方面性能优秀有着和原生Socket一样的延迟<br>
这也是RabbitMQ高性能的原因所在。可谓“人如其名”RabbitMQ像兔子一样迅速。<br>
**<h2>RabbitMQ安装</h2>**
**<h3>RabbitMQ官方下载</h3>**
https://www.rabbitmq.com/download.html
**<h3>Docker下载</h3>**
docker run -d --hostname hostname \
--name name \
-p 15672:15672 \
-p 5672:5672 \
-e "RABBITMQ_DEFAULT_USER=username" \
-e "RABBITMQ_DEFAULT_PASS=password" \
rabbitmq
**<h2>RabbitMQ常用交换机</h2>**
**<h3>直连型交换机-Direct Exchange</h3>**
一一对应的模式,本次提交的代码即为该模式,用于日志组件的调用
**<h3>主题交换机-Topic Exchange</h3>**
根据规则一对多的模式,比较负责的项目会使用到
**<h3>扇型交换机-Fanout Exchange</h3>**
广播的模式,不太注重幂等性的场景可能会使用到
**<h2>Nacos中配置yml</h2>**
\# spring配置<br>
spring: <br>
\# 配置rabbitMq 服务器<br>
&emsp;rabbitmq:<br>
&emsp;&emsp;host: ip<br>
&emsp;&emsp;port: port<br>
&emsp;&emsp;username: username<br>
&emsp;&emsp;password: password<br>
&emsp;&emsp;\# 确认消息已发送到交换机(Exchange)<br>
&emsp;&emsp;publisher-confirm-type: correlated<br>
&emsp;&emsp;\# 确认消息已发送到队列(Queue)<br>
&emsp;&emsp;publisher-returns: true<br>
&emsp;&emsp;listener:<br>
&emsp;&emsp;&emsp;simple:<br>
&emsp;&emsp;&emsp;&emsp;acknowledge-mode: manual<br>
**<h2>操作日志切换为MQ模式</h2>**<br>
<h3>step1将AsyncLogService的@Primary注解删除<br></h3>
<h3>step2给RabbitLogService添加@Primary注解<br></h3>
<h3>step3启动项目即可</h3>

@ -23,5 +23,17 @@
<artifactId>ruoyi-common-security</artifactId>
</dependency>
<!-- SpringBoot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--Rabbit MQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>

@ -4,6 +4,7 @@ import java.util.Collection;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.ruoyi.common.log.service.ILogService;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
@ -21,7 +22,6 @@ import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.core.utils.ip.IpUtils;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessStatus;
import com.ruoyi.common.log.service.AsyncLogService;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.system.api.domain.SysOperLog;
@ -37,7 +37,7 @@ public class LogAspect
private static final Logger log = LoggerFactory.getLogger(LogAspect.class);
@Autowired
private AsyncLogService asyncLogService;
private ILogService logService;
/**
*
@ -92,8 +92,8 @@ public class LogAspect
operLog.setRequestMethod(ServletUtils.getRequest().getMethod());
// 处理设置注解上的参数
getControllerMethodDescription(joinPoint, controllerLog, operLog, jsonResult);
// 保存数据库
asyncLogService.saveSysLog(operLog);
// 保存日志
logService.saveSysLog(operLog);
}
catch (Exception exp)
{

@ -0,0 +1,48 @@
package com.ruoyi.common.log.mq.rabbit;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* -
*
* @author ruoyi
*/
@Configuration
public class RabbitLogCallback {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("ReturnCallback: "+"消息:"+returnedMessage.getMessage());
System.out.println("ReturnCallback: "+"回应码:"+returnedMessage.getReplyCode());
System.out.println("ReturnCallback: "+"回应信息:"+returnedMessage.getReplyText());
System.out.println("ReturnCallback: "+"交换机:"+returnedMessage.getExchange());
System.out.println("ReturnCallback: "+"路由键:"+returnedMessage.getRoutingKey());
}
});
return rabbitTemplate;
}
}

@ -0,0 +1,58 @@
package com.ruoyi.common.log.mq.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* -
*
* @author ruoyi
*/
@Configuration
public class RabbitLogDirect {
/**
*
*/
public final static String queueName = "OperaLogDirectQueue";
/**
*
*/
public final static String exchangeName = "OperaLogDirectExchange";
/**
*
*/
public final static String routingKeyName = "OperaLogDirectRouting";
//队列 起名OperaLogQueue
@Bean
public Queue operaLogQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue(queueName,true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue(queueName, true);
}
// Direct交换机 起名OperaLogDirectExchange
@Bean
DirectExchange operaLogDirectExchange() {
// return new DirectExchange(exchangeName,true,true);
return new DirectExchange(exchangeName, true, false);
}
// 绑定 将队列和交换机绑定, 并设置用于匹配键OperaLogDirectRouting
@Bean
Binding operaLogBindingDirect() {
return BindingBuilder.bind(operaLogQueue()).to(operaLogDirectExchange()).with(routingKeyName);
}
}

@ -0,0 +1,16 @@
package com.ruoyi.common.log.service;
import com.ruoyi.system.api.domain.SysOperLog;
/**
*
*
* @author ruoyi
*/
public interface ILogService {
/**
*
*/
public void saveSysLog(SysOperLog sysOperLog);
}

@ -1,6 +1,8 @@
package com.ruoyi.common.log.service;
package com.ruoyi.common.log.service.impl;
import com.ruoyi.common.log.service.ILogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.ruoyi.common.core.constant.SecurityConstants;
@ -12,8 +14,9 @@ import com.ruoyi.system.api.domain.SysOperLog;
*
* @author ruoyi
*/
@Primary
@Service
public class AsyncLogService
public class AsyncLogService implements ILogService
{
@Autowired
private RemoteLogService remoteLogService;
@ -22,6 +25,7 @@ public class AsyncLogService
*
*/
@Async
@Override
public void saveSysLog(SysOperLog sysOperLog)
{
remoteLogService.saveLog(sysOperLog, SecurityConstants.INNER);

@ -0,0 +1,68 @@
package com.ruoyi.common.log.service.impl;
import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.log.mq.rabbit.RabbitLogDirect;
import com.ruoyi.common.log.service.ILogService;
import com.ruoyi.system.api.RemoteLogService;
import com.ruoyi.system.api.domain.SysOperLog;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* MQ
*
* @author ruoyi
*/
@Component
@RabbitListener(queues = RabbitLogDirect.queueName)//监听的队列名称
public class RabbitLogService implements ILogService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RemoteLogService remoteLogService;
/**
*
* @param sysOperLog
*/
@Override
public void saveSysLog(SysOperLog sysOperLog) {
// 将消息保存队列
rabbitTemplate.convertAndSend(RabbitLogDirect.exchangeName, RabbitLogDirect.routingKeyName, sysOperLog);
}
/**
*
* @param sysOperLog
* @param channel
* @param tag
* @Description RabbitMQ Channel
* RabbitMQ basic.deliver
* delivery tag RabbitMQ Channel ID
* delivery tag Channel
*/
@RabbitHandler
public void process(SysOperLog sysOperLog, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
R<Boolean> result = remoteLogService.saveLog(sysOperLog, SecurityConstants.INNER);
try {
if(result.getCode() == 200) {
// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(tag, true);
} else {
// 第二个参数true会重新放回队列所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(tag, true);
}
} catch (Exception e) {
channel.basicReject(tag, false);
}
}
}

@ -1,3 +1,6 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.ruoyi.common.log.service.AsyncLogService,\
com.ruoyi.common.log.service.impl.AsyncLogService,\
com.ruoyi.common.log.service.impl.RabbitLogService,\
com.ruoyi.common.log.mq.rabbit.RabbitLogDirect,\
com.ruoyi.common.log.mq.rabbit.RabbitLogCallback,\
com.ruoyi.common.log.aspect.LogAspect

Loading…
Cancel
Save