pull/1/head
595208882@qq.com 4 years ago
parent edf7bbc4a5
commit 9d2cfacb12

@ -604,8 +604,27 @@ WHERE A.EMP_SUPV_ID = B.EMP_ID;
从粒度上来说就是**表锁、页锁、行锁**。表锁有意向共享锁、意向排他锁、自增锁等。行锁是在引擎层由各个引擎自己实现的。但并不是所有的引擎都支持行锁,比如 `MyISAM`引擎就`不支持行锁`。
按照锁的粒度进行分类MySQL主要包含三种类型级别的锁定机制
- **全局锁**锁的是整个database。由MySQL的SQL layer层实现的
- **表级锁**锁的是某个table。由MySQL的SQL layer层实现的
- **⾏级锁**锁的是某⾏数据也可能锁定⾏之间的间隙。由某些存储引擎实现⽐如InnoDB
表级锁和行级锁的区别:
- **表级锁**:开销⼩,加锁快;不会出现死锁;锁定粒度⼤,发⽣锁冲突的概率最⾼,并发度最低
- **⾏级锁**:开销⼤,加锁慢;会出现死锁;锁定粒度最⼩,发⽣锁冲突的概率最低,并发度也最⾼
## 全局锁
锁的是整个database。由MySQL的SQL layer层实现的。
## 行级锁
@ -824,7 +843,7 @@ select ... for update;
### AUTO-INC
### AUTO-INC锁(自增长锁)
在为某个字段声明 `AUTO_INCREMENT` 属性时,之后可以在插入数据时,可以不指定该字段的值,数据库会自动给该字段赋值递增的值,这主要是通过 AUTO-INC 锁实现的。
@ -1304,8 +1323,74 @@ SELECT * FROM products WHERE id LIKE '3' FOR UPDATE;
# MySQL日志
## binlog
`binlog` 用于记录数据库执行的写入性操作(不包括查询)信息,以二进制的形式保存在磁盘中。`binlog` 是 `mysql`的逻辑日志,并且由 `Server` 层进行记录,使用任何存储引擎的 `mysql` 数据库都会记录 `binlog` 日志。
- **逻辑日志**可以简单理解为记录的就是sql语句 。
- **物理日志**`mysql` 数据最终是保存在数据页中的,物理日志记录的就是数据页变更 。
`binlog` 是通过追加的方式进行写入的,可以通过`max_binlog_size` 参数设置每个 `binlog`文件的大小,当文件大小达到给定值之后,会生成新的文件来保存日志。
### 使用场景
在实际应用中, `binlog` 的主要使用场景有两个,分别是 **主从复制** 和 **数据恢复** 。
- **主从复制** :在 `Master` 端开启 `binlog` ,然后将 `binlog`发送到各个 `Slave` 端, `Slave` 端重放 `binlog` 从而达到主从数据一致
- **数据恢复** :通过使用 `mysqlbinlog` 工具来恢复数据
### 刷盘时机
对于 `InnoDB` 存储引擎而言,只有在事务提交时才会记录`biglog` ,此时记录还在内存中,那么 `biglog`是什么时候刷到磁盘中的呢?`mysql` 通过 `sync_binlog` 参数控制 `biglog` 的刷盘时机,取值范围是 `0-N`
- 0不去强制要求由系统自行判断何时写入磁盘
- 1每次 `commit` 的时候都要将 `binlog` 写入磁盘;
- N每N个事务才会将 `binlog` 写入磁盘。
从上面可以看出, `sync_binlog` 最安全的是设置是 `1` ,这也是`MySQL 5.7.7`之后版本的默认值。但是设置一个大一些的值可以提升数据库性能,因此实际情况下也可以将值适当调大,牺牲一定的一致性来获取更好的性能。
### 日志格式
`binlog` 日志有三种格式,分别为 `STATMENT``ROW``MIXED`
`MySQL 5.7.7` 之前,默认的格式是 `STATEMENT` `MySQL 5.7.7` 之后,默认值是 `ROW`。日志格式通过 `binlog-format` 指定。
- `STATMENT`:基于`SQL` 语句的复制( `statement-based replication, SBR` )每一条会修改数据的sql语句会记录到`binlog` 中 。
- - 优点:不需要记录每一行的变化,减少了 binlog 日志量,节约了 IO , 从而提高了性能;
- 缺点在某些情况下会导致主从数据不一致比如执行sysdate() 、 slepp() 等 。
- `ROW`:基于行的复制(`row-based replication, RBR` )不记录每条sql语句的上下文信息仅需记录哪条数据被修改了 。
- - 优点不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题
- 缺点:会产生大量的日志,尤其是` alter table ` 的时候会让日志暴涨
- `MIXED`:基于`STATMENT` 和 `ROW` 两种模式的混合复制(`mixed-based replication, MBR` ),一般的复制使用`STATEMENT` 模式保存 `binlog` ,对于 `STATEMENT` 模式无法复制的操作使用 `ROW` 模式保存 `binlog`
## redo log
## undo log
# InnoDB
## 线程
## 数据页
数据页主要是用来存储表中记录的,它在磁盘中是用双向链表相连的,方便查找,能够非常快速得从一个数据页,定位到另一个数据页。
@ -1641,6 +1726,21 @@ SELECT * FROM products WHERE id LIKE '3' FOR UPDATE;
## Change Buffer
可变缓冲区Change Buffer在内存中可变缓冲区是InnoDB缓冲池的一部分在磁盘上它是系统表空间的一部分因此即使在数据库重新启动之后索引更改也会保持缓冲状态。
可变缓冲区是一种特殊的数据结构,当受影响的页不在缓冲池中时,缓存对辅助索引页的更改。
## Log Buffer
日志缓冲区Log Buffer 主要保存写到redo log(重放日志)的数据。周期性的将缓冲区内的数据写入redo日志中。将内存中的数据写入磁盘的行为由innodb_log_at_trx_commit 和 innodb_log_at_timeout 调节。较大的redo日志缓冲区允许大型事务在事务提交前不进行写磁盘操作。
变量innodb_log_buffer_size (默认 16M)
## InnoDB日志
### Redo Log(重做日志)

@ -940,59 +940,306 @@ cluster_stats_messages_received:3021
# RocketMQ
RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
# Kafka
**RocketMQ 特点**
# RocketMQ
- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
- Producer、Consumer、队列都可以分布式
- Producer 向一些队列轮流发送消息,队列集合称为 TopicConsumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
- 能够保证严格的消息顺序
- 支持拉pull和推push两种消息模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持多种消息协议,如 JMS、OpenMessaging 等
- 较少的依赖
RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
## 功能优势
**功能优势**
- **削峰填谷**:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
- **系统解耦**:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
- **提升性能**:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
- **蓄流压测**:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
- **异步处理**:不需要同步执行的远程调用可以有效提高响应时间
## 队列模式
## 架构设计
### 集群模式
### 部署模型
![img](images/Middleware/1090617-20190626233829426-1023022108.png)
### 角色
#### Broker
- 理解成RocketMQ本身
- Broker主要用于Producer和Consumer接收和发送消息
- Broker会定时向NameSrver提交自己的信息
- 是消息中间件的消息存储、转发服务器
- 每个Broker节点在启动时都会遍历NameServer列表与每个NameServer建立长连接注册自己的信息之后定时上报
#### NameServer
- 理解成Zookeeper的效果只是他没用zk而是自己写了个NameServer来替代zk
- 底层由Netty实现提供了路由管理、服务注册、服务发现的功能是一个无状态节点
- NameServer是服务发现者集群中各个角色Producer、Broker、Consumer等都需要定时向NameServer上报自己的状态以便互相发现彼此超时不上报的话NameServer会把它从列表中剔除
- NameServer可以部署多个当多个NameServer存在的时候其他角色同时向他们上报信息以保证高可用
- NameServer集群间互不通信没有主备的概念
- NameServer内存式存储NameServer中的Broker、Topic等信息默认不会持久化所以他是无状态节点
#### Producer
- 消息的生产者
- 随机选择其中一个NameServer节点建立长连接获得Topic路由信息包括Topic下的Queue这些Queue分布在哪些Broker上等等
- 接下来向提供Topic服务的Master建立长连接因为RocketMQ只有Master才能写消息且定时向Master发送心跳
#### Consumer
- 消息的消费者
- 通过NameServer集群获得Topic的路由信息连接到对应的Broker上消费消息
- 由于Master和Slave都可以读取消息因此Consumer会与Master和Slave都建立连接进行消费消息
### 核心流程
- Broker都注册到Nameserver上
- Producer发消息的时候会从Nameserver上获取发消息的Topic信息
- Producer向提供服务的所有Master建立长连接且定时向Master发送心跳
- Consumer通过NameServer集群获得Topic的路由信息
- Consumer会与所有的Master和所有的Slave都建立连接进行监听新消息
### 实现原理
RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干BrokerRocketMQ进程组成它的架构原理是这样的
- Broker在启动的时候去向所有的NameServer注册并保持长连接每30s发送一次心跳
- Producer在发送消息的时候从NameServer获取Broker服务器地址根据负载均衡算法选择一台服务器来发送消息
- Conusmer消费消息的时候同样从NameServer获取Broker地址然后主动拉取消息来消费
![RocketMQ实现原理](images/Middleware/RocketMQ实现原理.jpg)
## 核心概念
### Message(消息)
消息载体。Message发送或者消费的时候必须指定Topic。Message有一个可选的Tag项用于过滤消息还可以添加额外的键值对。
### Topic(主题)
消息的逻辑分类发消息之前必须要指定一个topic才能发就是将这条消息发送到这个topic上。消费消息的时候指定这个topic进行消费。就是逻辑分类。
### Queue(队列)
1个Topic会被分为N个Queue数量是可配置的。message本身其实是存储到queue上的消费者消费的也是queue上的消息。多说一嘴比如1个topic4个queue有5个Consumer都在消费这个topic那么会有一个consumer浪费掉了因为负载均衡策略每个consumer消费1个queue5>4溢出1个这个会不工作。
### Tag(标签)
Tag 是 Topic 的进一步细分顾名思义标签。每个发送的时候消息都能打tag消费的时候可以根据tag进行过滤选择性消费。
### 消费模式(Message Model)
消息模型集群Clustering和广播Broadcasting
#### 集群模式(Clustering)
生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。
![集群模式](images/Middleware/集群模式.jpg)
- 每条消息只需要被处理一次Broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时,不能保证路由到同一台机器上
- 消费状态由Broker维护
### 广播模式
#### 广播模式(Broadcasting)
**为了解决一条消息能被多个消费者消费的问题**,发布/订阅模型就来了。该模型是将消息发往一个`Topic`即主题中,所有订阅了这个 `Topic` 的订阅者都能消费这条消息。
![广播模式](images/Middleware/广播模式.jpg)
- 消费进度由Consumer维护
- 保证每个消费者都消费一次消息
- 消费失败的消息不会重投
## 分布式事务消息
MQ与DB一致性原理两方事务
### Message Order(消息顺序)
![MQ与DB一致性原理](images/Middleware/MQ与DB一致性原理.png)
消息顺序顺序Orderly和并发Concurrently
#### 顺序Orderly
#### 并发Concurrently
### Producer Group(生产组)
消息生产者组。标识发送同一类消息的Producer通常发送逻辑一致。发送普通消息的时候仅标识使用并无特别用处。若事务消息如果某条发送某条消息的producer-A宕机使得事务消息一直处于PREPARED状态并超时则broker会回查同一个group的其 他producer确认这条消息应该commit还是rollback。但开源版本并不完全支持事务消息阉割了事务回查的代码
### Consumer Group(消费组)
消息消费者组。标识一类Consumer的集合名称这类Consumer通常消费一类消息且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息起到负载均衡的作用。消费进度以Consumer Group为粒度管理不同Consumer Group之间消费进度彼此不受影响即消息A被Consumer Group1消费过也会再给Consumer Group2消费。
注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息即必须要听一样的topic(并且tag也一样)。
### Offset
在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置Consumer Offset这个位置之前的消息都被消费过之后的消息都没有被消费过每成功消费一条消息消费位置就加一。这个消费位置是非常重要的概念我们在使用消息队列的时候丢消息的原因大多是由于消费位置处理不当导致的。
![RocketMQ-Offset](images/Middleware/RocketMQ-Offset.png)
## 核心设计
### 消息清理
Broker中的消息被消费后不会立即删除每条消息都会持久化到CommitLog中每个Consumer连接到Broker后会维持消费进度信息当有消息消费后只是当前Consumer的消费进度CommitLog的offset更新了。默认48小时后会删除不再使用的CommitLog文件
- 检查这个文件最后访问时间
- 判断是否大于过期时间
- 指定时间删除默认凌晨4点
```java
/**
* {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
*/
private boolean isTimeToDelete() {
// when = "04";
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
// 是04点就返回true
if (UtilAll.isItTimeToDo(when)) {
return true;
}
// 不是04点返回false
return false;
}
/**
* {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
*/
private void deleteExpiredFiles() {
// isTimeToDelete()这个方法是判断是不是凌晨四点,是的话就执行删除逻辑。
if (isTimeToDelete()) {
// 默认是72但是broker配置文件默认改成了48所以新版本都是48。
long fileReservedTime = 48 * 60 * 60 * 1000;
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
}
}
/**
* {@link org.apache.rocketmq.store.CommitLog#deleteExpiredFile()}
*/
public int deleteExpiredFile(xxx) {
// 这个方法的主逻辑就是遍历查找最后更改时间+过期时间小于当前系统时间的话就删了也就是小于48小时
return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}
```
### push or pull
RocketMQ没有真正意义的push都是pull虽然有push类但实际底层实现采用的是**长轮询机制**即拉取方式。Broker端属性 `longPollingEnable` 标记是否开启长轮询,默认开启。源码如下:
```java
// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// 拉取消息结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);
```
**为什么要主动拉取消息而不使用事件监听方式?**
事件驱动方式是建立好长连接由事件发送数据的方式来实时推送。如果broker主动推送消息的话有可能push速度快消费速度慢的情况那么就会造成消息在consumer端堆积过多同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull不会造成过多的压力而造成瓶颈。所以采取了pull的方式。
### 负载均衡
RocketMQ通过Topic在多Broker中分布式存储实现。
#### Producer端
发送端指定message queue发送消息到相应的broker来达到写入时的负载均衡
- 提升写入吞吐量当多个producer同时向一个broker写入数据的时候性能会下降
- 消息分布在多broker中为负载消费做准备
**默认策略是随机选择:**
- producer维护一个index
- 每次取节点会自增
- index向所有broker个数取余
- 自带容错策略
**其他实现:**
- SelectMessageQueueByHash
- - hash的是传入的args
- SelectMessageQueueByRandom
- SelectMessageQueueByMachineRoom 没有实现
也可以自定义实现**MessageQueueSelector**接口中的select方法
```java
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
```
#### Consumer端
采用的是平均分配算法来进行负载均衡。
**其他负载均衡算法**
- 平均分配策略(默认)(AllocateMessageQueueAveragely)
- 环形分配策略(AllocateMessageQueueAveragelyByCircle)
- 手动配置分配策略(AllocateMessageQueueByConfig)
- 机房分配策略(AllocateMessageQueueByMachineRoom)
- 一致性哈希分配策略(AllocateMessageQueueConsistentHash)
- 靠近机房策略(AllocateMachineRoomNearby)
**当消费负载均衡Consumer和Queue不对等的时候会发生什么**
Consumer和Queue会优先平均分配如果Consumer少于Queue的个数则会存在部分Consumer消费多个Queue的情况如果Consumer等于Queue的个数那就是一个Consumer消费一个Queue如果Consumer个数大于Queue的个数那么会有部分Consumer空余出来白白的浪费了。
@ -1033,148 +1280,210 @@ MQ与DB一致性原理两方事务
## 常见问题
### 事务消息
**消息可靠性怎么保证?**
MQ与DB一致性原理两方事务
消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。
![MQ与DB一致性原理](images/Middleware/MQ与DB一致性原理.png)
- **生产者丢失**
事务消息就是MQ提供的类似XA的分布式事务能力通过事务消息可以达到分布式事务的最终一致性。半事务消息就是MQ收到了生产者的消息但是没有收到二次确认不能投递的消息。实现原理如下
**产生原因**可能因为程序发送失败抛异常而没有重试处理或发送过程成功但过程中网络闪断MQ没收到
- 生产者先发送一条半事务消息到MQ
- MQ收到消息后返回ack确认
- 生产者开始执行本地事务
- 如果事务执行成功发送commit到MQ失败发送rollback
- 如果MQ长时间未收到生产者的二次确认commit或者rollbackMQ对生产者发起消息回查
- 生产者查询事务执行最终状态
- 根据查询事务状态再次提交二次确认
**解决方案**
如果MQ收到二次确认commit就可以把消息投递给消费者反之如果是rollback消息会保存下来并且在3天后被删除。
- 发送异常:本地消息表
- 发送成功无回调:异步发送+回调通知+本地消息表
![RocketMQ事务消息](images/Middleware/RocketMQ事务消息.jpg)
- **MQ丢失**
**产生原因**如果生产者保证消息发送到MQ而MQ收到消息后还在内存中这时候宕机了又没来得及同步给从节点就有可能导致消息丢失
**解决方案**RocketMQ分为同步刷盘和异步刷盘两种方式默认的是异步刷盘。可以修改配置为同步刷盘来提高消息可靠性但会对性能有损耗需权衡
## 保证顺序
- **消费者丢失**
RocketMQ的消息是存储到Topic的Queue里面的Queue本身是FIFOFirst Int First Out先进先出队列。所以单个Queue是可以保证有序性的。
**产生原因**消费者丢失消息的场景消费者刚收到消息此时服务器宕机MQ认为消费者已经消费不会重复发送消息消息丢失
顺序消息FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:
**解决方案**消费方不返回ack确认重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同如果重试超过次数之后会进入死信队列需要手工来处理
- **顺序发布**
- **顺序消费**
**如果一直消费失败导致消息积压怎么处理?**
顺序消息包含两种类型:
因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑:
- **分区顺序**一个Partition内所有的消息按照先进先出的顺序进行发布和消费
- **全局顺序**一个Topic内所有的消息按照先进先出的顺序进行发布和消费
- 消费者出错肯定是程序或者其他问题导致的如果容易修复先把问题修复让consumer恢复正常消费
- 如果时间来不及处理很麻烦做转发处理写一个临时的consumer消费方案先把消息消费然后再转发到一个新的topic和MQ资源这个新的topic的机器资源单独申请要能承载住当前积压的消息
- 处理完积压数据后修复consumer去消费新的MQ和现有的MQ数据新MQ消费完成后恢复原状
![RocketMQ消费失败消息积压](images/Middleware/RocketMQ消费失败消息积压.jpg)
![img](images/Middleware/471426-20180519131211273-554395305.png)
对于两个订单的消息的原始数据a1、b1、b2、a2、a3、b3绝对时间下发生的顺序
**RocketMQ实现原理**
- 在发送时a订单的消息需要保持a1、a2、a3的顺序b订单的消息也相同但是a、b订单之间的消息没有顺序关系这意味着a、b订单的消息可以在不同的线程中被发送出去
- 在存储时需要分别保证a、b订单的消息的顺序但是a、b订单之间的消息的顺序可以不保证
RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干BrokerRocketMQ进程组成它的架构原理是这样的
- Broker在启动的时候去向所有的NameServer注册并保持长连接每30s发送一次心跳
- Producer在发送消息的时候从NameServer获取Broker服务器地址根据负载均衡算法选择一台服务器来发送消息
- Conusmer消费消息的时候同样从NameServer获取Broker地址然后主动拉取消息来消费
![RocketMQ实现原理](images/Middleware/RocketMQ实现原理.jpg)
### 保持顺序发送
消息被发送时保持顺序。
**为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?**
- 根据CAP理论同时最多只能满足两个点而zookeeper满足的是CP也就是说zookeeper并不能保证服务的可用性zookeeper在进行选举的时候整个选举的时间太长期间整个集群都处于不可用的状态而这对于一个注册中心来说肯定是不能接受的作为服务发现来说就应该是为可用性而设计
- 基于性能的考虑NameServer本身的实现非常轻量而且可以通过增加机器的方式水平扩展增加集群的抗压能力而zookeeper的写是不可扩展的而zookeeper要解决这个问题只能通过划分领域划分多个zookeeper集群来解决首先操作起来太复杂其次这样还是又违反了CAP中的A的设计导致服务之间是不连通的
- 持久化的机制来带的问题ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志同时再加上定期的将内存数据镜像Snapshot到磁盘来保证数据的一致性和持久性而对于一个简单的服务发现的场景来说这其实没有太大的必要这个实现方案太重了。而且本身存储的数据应该是高度定制化的
- 消息发送应该弱依赖注册中心而RocketMQ的设计理念也正是基于此生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地如果NameServer整个集群不可用短时间内对于生产者和消费者并不会产生太大影响
### 保持顺序发送存储
消息被存储时保持和发送的顺序一致。
**Broker是怎么保存数据的呢**
RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。
### 保持顺序消费
Broker在收到消息之后会把消息保存到commitlog的文件当中而同时在分布式的存储当中每个broker都会保存一部分topic的数据同时每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offsetindexfile中会保存key和offset的对应关系
消息被消费时保持和存储的顺序一致
![Broker数据结构](images/Middleware/Broker数据结构.jpg)
#### MQPullConsumer
MQPullConsumer由用户控制线程主动从服务端获取消息每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致用户需要再拿到这批消息后自己保证消费的顺序。
CommitLog文件保存于${Rocket_Home}/store/commitlog目录中可以根据文件名很明显看出来偏移量每个文件默认1G写满后自动生成一个新的文件。
由于同一个topic的消息并不是连续的存储在commitlog中消费者如果直接从commitlog获取消息效率非常低所以通过consumequeue保存commitlog中消息的偏移量的物理地址这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件然后根据一定的规则offset和文件大小取模在commitlog中快速定位。
#### MQPushConsumer
对于PushConsumer由用户注册MessageListener来消费消息在客户端中需要保证调用MessageListener时消息的顺序性。
**Master 和 Slave 之间是怎么同步数据的呢?**
而消息在master和slave之间的同步是根据raft协议来进行的
- 在broker收到消息后会被标记为uncommitted状态
- 然后会把消息发送给所有的slave
- slave在收到消息之后返回ack响应给master
- master在收到超过半数的ack之后把消息标记为committed
- 发送committed消息给所有slaveslave也修改状态为committed
## 消息不丢失
一条消息从生产到被消费,将会经历三个阶段:
![Rocket消息丢失](images/Middleware/Rocket消息丢失.jpg)
**RocketMQ 为什么速度快吗?**
- 生产阶段Producer 新建消息,然后通过网络将消息投递给 MQ Broker
- 存储阶段:消息将会存储在 Broker 端磁盘中
- 消息阶段Consumer 将会从 Broker 拉取消息
是因为使用了顺序存储、Page Cache和异步刷盘。
以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题
- 我们在写入commitlog的时候是顺序写入的这样比随机写入的性能就会提高很多
- 写入commitlog的时候并不是直接写入磁盘而是先写入操作系统的PageCache
- 最后由操作系统异步将缓存中的数据刷到磁盘
### 生产阶段
**什么是事务、半事务消息?怎么实现的?**
Producer通过网络将消息发送给Broker这个发送可能会发生丢失比如网络延迟不可达等。
事务消息就是MQ提供的类似XA的分布式事务能力通过事务消息可以达到分布式事务的最终一致性。半事务消息就是MQ收到了生产者的消息但是没有收到二次确认不能投递的消息。实现原理如下
失败会自动重试即使重试N次也不行后那客户端也会知道消息没成功这也可以自己补偿等不会盲目影响到主业务逻辑。再比如即使Broker挂了那还有其他Broker再提供服务了高可用不影响。
- 生产者先发送一条半事务消息到MQ
- MQ收到消息后返回ack确认
- 生产者开始执行本地事务
- 如果事务执行成功发送commit到MQ失败发送rollback
- 如果MQ长时间未收到生产者的二次确认commit或者rollbackMQ对生产者发起消息回查
- 生产者查询事务执行最终状态
- 根据查询事务状态再次提交二次确认
总结:**同步发送+自动重试机制+多个Master节点**
如果MQ收到二次确认commit就可以把消息投递给消费者反之如果是rollback消息会保存下来并且在3天后被删除。
![RocketMQ事务消息](images/Middleware/RocketMQ事务消息.jpg)
#### 同步发送
有三种send方法同步发送、异步发送、单向发送可以采取同步发送的方式进行发送消息。
## 消息丢失
- **同步发送**发消息时会同步阻塞等待broker返回的结果如果没成功则不会收到SendResult这种是最可靠的
- **异步发送**:在回调方法里可以得知是否发送成功
- **单向发送OneWay**:最不靠谱的一种发送方式,我们无法保证消息真正可达
**消息发送流程**
```java
/**
* {@link org.apache.rocketmq.client.producer.DefaultMQProducer}
*/
一条消息从生产到被消费,将会经历三个阶段:
// 同步发送
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
// 异步发送sendCallback作为回调
public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {}
// 单向发送,不关心发送结果,最不靠谱
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {}
```
![Rocket消息丢失](images/Middleware/Rocket消息丢失.jpg)
- 生产阶段Producer 新建消息,然后通过网络将消息投递给 MQ Broker
- 存储阶段:消息将会存储在 Broker 端磁盘中
- 消息阶段Consumer 将会从 Broker 拉取消息
以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。
#### 失败重试
发送消息如果失败或者超时了则会自动重试。默认是重试3次可以根据api进行更改比如改为10次
```java
producer.setRetryTimesWhenSendFailed(10);
```
### 生产阶段
底层源码逻辑如下:
```java
/**
* {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)}
*/
// 自动重试次数this.defaultMQProducer.getRetryTimesWhenSendFailed()默认为2如果是同步发送默认重试3次否则重试1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
for (; times < timesTotal; times++) {
// 选择发送的消息queue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
try {
// 真正的发送逻辑sendKernelImpl。
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 如果发送失败了则continue意味着还会再次进入for继续重试发送
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
// 发送成功的话,将发送结果返回给调用者
return sendResult;
default:
break;
}
} catch (RemotingException e) {
continue;
} catch (...) {
continue;
}
}
}
```
生产者Producer 通过网络发送消息给 Broker当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应就代表消息在生产阶段未丢失。
#### 故障切换
### Broker 存储阶段
假设Broker宕机了但是生产环境一般都是多M多S的所以还会有其他Master节点继续提供服务这也不会影响到我们发送消息我们消息依然可达。因为比如恰巧发送到broker的时候broker宕机了producer收到broker的响应发送失败了这时候producer会自动重试这时候宕机的broker就被踢下线了 所以producer会换一台broker发送消息。
默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息**存储磁盘成功**,才会返回响应。修改 Broker 端配置如下:
### Broker存储阶段
若想很严格的保证Broker存储消息阶段消息不丢失则需要如下配置但是性能肯定远差于默认配置
```properties
# master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
# slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH
```
#### 设置Broker同步刷盘策略
**设置Broker同步刷盘策略**。默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器断电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。
若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息**存储磁盘成功**,才会返回响应。修改 Broker 端配置如下:
```properties
# 默认情况为 ASYNC_FLUSH
@ -1185,9 +1494,103 @@ flushDiskType = SYNC_FLUSH
#### 等待Master和Slave刷盘完
等待Master和Slave刷盘完。即使Broker设置了同步刷盘策略但是Broker刷完盘后磁盘坏了这会导致盘上的消息全丢了。但是如果即使是1主1从了但是Master刷完盘后还没来得及同步给Slave就磁盘坏了这会导致盘上的消息全丢了。所以我们还可以配置不仅是等Master刷完盘就通知Producer而是等Master和Slave都刷完盘后才去通知Producer说消息ok了。
```properties
# 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER
```
### 消费阶段
消费者从broker拉取消息然后执行相应的业务逻辑。一旦执行成功将会返回`ConsumeConcurrentlyStatus. CONSUME_SUCCESS` 状态给Broker。如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。
消费失败了其实也是消息丢失的一种变体。
只有当消费模式为 **MessageModel.CLUSTERING(集群模式)**Broker 才会自动进行重试对于广播消息是不会重试的。对于一直无法消费成功的消息RocketMQ 会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对该死信消息业务做人工的补偿操作。
#### 手动ACK确认
消费者会先把消息拉取到本地然后进行业务逻辑业务逻辑完成后手动进行ack确认这时候才会真正的代表消费完成。而不是说pull到本地后消息就算消费完了。举个例子
```java
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try{
for (MessageExt msg : msgs) {
String str = new String(msg.getBody());
System.out.println(str);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch(Throwable t){
log.error("消费异常:{}", msgs, t);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
```
#### 消费异常自动重试
- 业务消费方返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
- 业务消费方返回 null
- 业务消费方主动/被动抛出异常
针对以上3种情况下Broker一般会进行重试默认最大重试16次RocketMQ 采用了“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小。我们可以在 RocketMQ 的 `broker.conf` 配置文件中配置 Consumer 侧重试次数及时间间隔(**距离第1次发送的时间间隔**, 配置如下:
```properties
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
```
消费者客户端,首先判断消费端有没有显式设置最大重试次数 MaxReconsumeTimes 如果没有,则设置默认重试次数为 16否则以设置的最大重试次数为准。
```java
private int getMaxReconsumeTimes() {
// default reconsume times: 16
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
return 16;
} else {
return this.defaultMQPushConsumer.getMaxReconsumeTimes();
}
}
```
#### 消费超时无线重试
如果是消费超时情况MQ会无限制的发送给消费端。这种情况就是Consumer端没有返回`ConsumeConcurrentlyStatus. CONSUME_SUCCESS`,也没有返回`ConsumeConcurrentlyStatus.RECONSUME_LATER`。
#### 死信队列
死信的处理逻辑:
- 首先判断消息当前重试次数是否大于等于 16或者消息延迟级别是否小于 0
- 只要满足上述的任意一个条件,设置新的 topic死信 topic**%DLQ%+consumerGroup**
- 进行前置属性的添加
- 将死信消息投递到上述步骤 2 建立的死信 topic 对应的死信队列中并落盘,使消息持久化
最后单独启动一个死信队列的消费者进行消费,然后进行人工干预处理失败的消息。
## 消息幂等
在所有消息系统中消费消息有三种模式:`at-most-once`(最多一次)、`at-least-once`(最少一次)和 `exactly-only-once`(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。
- `at-most-once`消息投递后不论消息是否被消费成功不会再重复投递有可能会导致消息未被消费RocketMQ 未使用该方式
- `at-lease-once`:消息投递后,消费完成后,向服务器返回 ACK没有消费则一定不会返回 ACK 消息。由于网络异常、客户端重启等原因,服务器未能收到客户端返回的 ACK服务器则会再次投递这就会导致可能重复消费RocketMQ 通过 ACK 来确保消息至少被消费一次
- `exactly-only-once`在分布式系统环境下如果要实现该模式巨大的开销不可避免。RocketMQ 没有保证此特性,无法避免消息重复,由业务上进行幂等性处理。必须下面两个条件都满足,才能认为消息是"Exactly Only Once"
- 发送消息阶段,不允许发送重复消息
- 消费消息阶段,不允许消费重复的消息

@ -4309,7 +4309,7 @@ BASE理论是对CAP中的一致性和可用性进行一个权衡的结果
## 一致性算法
## 一致性算法
### Gossip协议

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 615 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 615 KiB

Loading…
Cancel
Save