diff --git a/Database.md b/Database.md index 963b2e5..ee8d613 100644 --- a/Database.md +++ b/Database.md @@ -604,8 +604,27 @@ WHERE A.EMP_SUPV_ID = B.EMP_ID; 从粒度上来说就是**表锁、页锁、行锁**。表锁有意向共享锁、意向排他锁、自增锁等。行锁是在引擎层由各个引擎自己实现的。但并不是所有的引擎都支持行锁,比如 `MyISAM`引擎就`不支持行锁`。 + + +按照锁的粒度进行分类,MySQL主要包含三种类型(级别)的锁定机制: + +- **全局锁**:锁的是整个database。由MySQL的SQL layer层实现的 +- **表级锁**:锁的是某个table。由MySQL的SQL layer层实现的 +- **⾏级锁**:锁的是某⾏数据,也可能锁定⾏之间的间隙。由某些存储引擎实现,⽐如InnoDB + + + +表级锁和行级锁的区别: + +- **表级锁**:开销⼩,加锁快;不会出现死锁;锁定粒度⼤,发⽣锁冲突的概率最⾼,并发度最低 +- **⾏级锁**:开销⼤,加锁慢;会出现死锁;锁定粒度最⼩,发⽣锁冲突的概率最低,并发度也最⾼ + + + ## 全局锁 +锁的是整个database。由MySQL的SQL layer层实现的。 + ## 行级锁 @@ -824,7 +843,7 @@ select ... for update; -### AUTO-INC 锁 +### AUTO-INC锁(自增长锁) 在为某个字段声明 `AUTO_INCREMENT` 属性时,之后可以在插入数据时,可以不指定该字段的值,数据库会自动给该字段赋值递增的值,这主要是通过 AUTO-INC 锁实现的。 @@ -1304,8 +1323,74 @@ SELECT * FROM products WHERE id LIKE '3' FOR UPDATE; +# MySQL日志 + +## binlog + +`binlog` 用于记录数据库执行的写入性操作(不包括查询)信息,以二进制的形式保存在磁盘中。`binlog` 是 `mysql`的逻辑日志,并且由 `Server` 层进行记录,使用任何存储引擎的 `mysql` 数据库都会记录 `binlog` 日志。 + +- **逻辑日志**:可以简单理解为记录的就是sql语句 。 +- **物理日志**:`mysql` 数据最终是保存在数据页中的,物理日志记录的就是数据页变更 。 + +`binlog` 是通过追加的方式进行写入的,可以通过`max_binlog_size` 参数设置每个 `binlog`文件的大小,当文件大小达到给定值之后,会生成新的文件来保存日志。 + + + +### 使用场景 + +在实际应用中, `binlog` 的主要使用场景有两个,分别是 **主从复制** 和 **数据恢复** 。 + +- **主从复制** :在 `Master` 端开启 `binlog` ,然后将 `binlog`发送到各个 `Slave` 端, `Slave` 端重放 `binlog` 从而达到主从数据一致 +- **数据恢复** :通过使用 `mysqlbinlog` 工具来恢复数据 + + + +### 刷盘时机 + +对于 `InnoDB` 存储引擎而言,只有在事务提交时才会记录`biglog` ,此时记录还在内存中,那么 `biglog`是什么时候刷到磁盘中的呢?`mysql` 通过 `sync_binlog` 参数控制 `biglog` 的刷盘时机,取值范围是 `0-N`: + +- 0:不去强制要求,由系统自行判断何时写入磁盘; +- 1:每次 `commit` 的时候都要将 `binlog` 写入磁盘; +- N:每N个事务,才会将 `binlog` 写入磁盘。 + +从上面可以看出, `sync_binlog` 最安全的是设置是 `1` ,这也是`MySQL 5.7.7`之后版本的默认值。但是设置一个大一些的值可以提升数据库性能,因此实际情况下也可以将值适当调大,牺牲一定的一致性来获取更好的性能。 + + + +### 日志格式 + +`binlog` 日志有三种格式,分别为 `STATMENT` 、 `ROW` 和 `MIXED`。 + +在 `MySQL 5.7.7` 之前,默认的格式是 `STATEMENT` , `MySQL 5.7.7` 之后,默认值是 `ROW`。日志格式通过 `binlog-format` 指定。 + +- `STATMENT`:基于`SQL` 语句的复制( `statement-based replication, SBR` ),每一条会修改数据的sql语句会记录到`binlog` 中 。 + +- - 优点:不需要记录每一行的变化,减少了 binlog 日志量,节约了 IO , 从而提高了性能; + - 缺点:在某些情况下会导致主从数据不一致,比如执行sysdate() 、 slepp() 等 。 + +- `ROW`:基于行的复制(`row-based replication, RBR` ),不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了 。 + +- - 优点:不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题 ; + - 缺点:会产生大量的日志,尤其是` alter table ` 的时候会让日志暴涨 + +- `MIXED`:基于`STATMENT` 和 `ROW` 两种模式的混合复制(`mixed-based replication, MBR` ),一般的复制使用`STATEMENT` 模式保存 `binlog` ,对于 `STATEMENT` 模式无法复制的操作使用 `ROW` 模式保存 `binlog` + + + +## redo log + + + +## undo log + + + # InnoDB +## 线程 + + + ## 数据页 数据页主要是用来存储表中记录的,它在磁盘中是用双向链表相连的,方便查找,能够非常快速得从一个数据页,定位到另一个数据页。 @@ -1641,6 +1726,21 @@ SELECT * FROM products WHERE id LIKE '3' FOR UPDATE; +## Change Buffer + +可变缓冲区(Change Buffer),在内存中,可变缓冲区是InnoDB缓冲池的一部分,在磁盘上,它是系统表空间的一部分,因此即使在数据库重新启动之后,索引更改也会保持缓冲状态。 + +可变缓冲区是一种特殊的数据结构,当受影响的页不在缓冲池中时,缓存对辅助索引页的更改。 + + + +## Log Buffer + +日志缓冲区(Log Buffer ),主要保存写到redo log(重放日志)的数据。周期性的将缓冲区内的数据写入redo日志中。将内存中的数据写入磁盘的行为由innodb_log_at_trx_commit 和 innodb_log_at_timeout 调节。较大的redo日志缓冲区允许大型事务在事务提交前不进行写磁盘操作。 + +变量:innodb_log_buffer_size (默认 16M) + + ## InnoDB日志 ### Redo Log(重做日志) diff --git a/Middleware.md b/Middleware.md index 1fc3cf1..f623cbd 100644 --- a/Middleware.md +++ b/Middleware.md @@ -940,59 +940,306 @@ cluster_stats_messages_received:3021 -# RocketMQ - -RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。 +# Kafka -**RocketMQ 特点** +# RocketMQ -- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点 -- Producer、Consumer、队列都可以分布式 -- Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合 -- 能够保证严格的消息顺序 -- 支持拉(pull)和推(push)两种消息模式 -- 高效的订阅者水平扩展能力 -- 实时的消息订阅机制 -- 亿级消息堆积能力 -- 支持多种消息协议,如 JMS、OpenMessaging 等 -- 较少的依赖 +RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。 -## 功能优势 +**功能优势** - **削峰填谷**:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题 - **系统解耦**:解决不同重要程度、不同能力级别系统之间依赖导致一死全死 - **提升性能**:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统 - **蓄流压测**:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测 +- **异步处理**:不需要同步执行的远程调用可以有效提高响应时间 -## 队列模式 +## 架构设计 -### 集群模式 +### 部署模型 + +![img](images/Middleware/1090617-20190626233829426-1023022108.png) + + + +### 角色 + +#### Broker + +- 理解成RocketMQ本身 +- Broker主要用于Producer和Consumer接收和发送消息 +- Broker会定时向NameSrver提交自己的信息 +- 是消息中间件的消息存储、转发服务器 +- 每个Broker节点在启动时都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报 + + + +#### NameServer + +- 理解成Zookeeper的效果,只是他没用zk,而是自己写了个NameServer来替代zk +- 底层由Netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点 +- NameServer是服务发现者,集群中各个角色(Producer、Broker、Consumer等)都需要定时向NameServer上报自己的状态,以便互相发现彼此,超时不上报的话,NameServer会把它从列表中剔除 +- NameServer可以部署多个,当多个NameServer存在的时候,其他角色同时向他们上报信息,以保证高可用, +- NameServer集群间互不通信,没有主备的概念 +- NameServer内存式存储,NameServer中的Broker、Topic等信息默认不会持久化,所以他是无状态节点 + + + +#### Producer + +- 消息的生产者 +- 随机选择其中一个NameServer节点建立长连接,获得Topic路由信息(包括Topic下的Queue,这些Queue分布在哪些Broker上等等) +- 接下来向提供Topic服务的Master建立长连接(因为RocketMQ只有Master才能写消息),且定时向Master发送心跳 + + + +#### Consumer + +- 消息的消费者 +- 通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息 +- 由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接进行消费消息 + + + +### 核心流程 + +- Broker都注册到Nameserver上 +- Producer发消息的时候会从Nameserver上获取发消息的Topic信息 +- Producer向提供服务的所有Master建立长连接,且定时向Master发送心跳 +- Consumer通过NameServer集群获得Topic的路由信息 +- Consumer会与所有的Master和所有的Slave都建立连接进行监听新消息 + + + +### 实现原理 + +RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成,它的架构原理是这样的: + +- Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳 +- Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息 +- Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费 + +![RocketMQ实现原理](images/Middleware/RocketMQ实现原理.jpg) + + + +## 核心概念 + +### Message(消息) + +消息载体。Message发送或者消费的时候必须指定Topic。Message有一个可选的Tag项用于过滤消息,还可以添加额外的键值对。 + + + +### Topic(主题) + +消息的逻辑分类,发消息之前必须要指定一个topic才能发,就是将这条消息发送到这个topic上。消费消息的时候指定这个topic进行消费。就是逻辑分类。 + + + +### Queue(队列) + +1个Topic会被分为N个Queue,数量是可配置的。message本身其实是存储到queue上的,消费者消费的也是queue上的消息。多说一嘴,比如1个topic4个queue,有5个Consumer都在消费这个topic,那么会有一个consumer浪费掉了,因为负载均衡策略,每个consumer消费1个queue,5>4,溢出1个,这个会不工作。 + + + +### Tag(标签) + +Tag 是 Topic 的进一步细分,顾名思义,标签。每个发送的时候消息都能打tag,消费的时候可以根据tag进行过滤,选择性消费。 + + + +### 消费模式(Message Model) + +消息模型:集群(Clustering)和广播(Broadcasting) + +#### 集群模式(Clustering) 生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。 ![集群模式](images/Middleware/集群模式.jpg) +- 每条消息只需要被处理一次,Broker只会把消息发送给消费集群中的一个消费者 +- 在消息重投时,不能保证路由到同一台机器上 +- 消费状态由Broker维护 + -### 广播模式 +#### 广播模式(Broadcasting) **为了解决一条消息能被多个消费者消费的问题**,发布/订阅模型就来了。该模型是将消息发往一个`Topic`即主题中,所有订阅了这个 `Topic` 的订阅者都能消费这条消息。 ![广播模式](images/Middleware/广播模式.jpg) +- 消费进度由Consumer维护 +- 保证每个消费者都消费一次消息 +- 消费失败的消息不会重投 -## 分布式事务消息 -MQ与DB一致性原理(两方事务) +### Message Order(消息顺序) -![MQ与DB一致性原理](images/Middleware/MQ与DB一致性原理.png) +消息顺序:顺序(Orderly)和并发(Concurrently) + +#### 顺序(Orderly) + + + +#### 并发(Concurrently) + + + +### Producer Group(生产组) + +消息生产者组。标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不完全支持事务消息(阉割了事务回查的代码)。 + + + +### Consumer Group(消费组) + +消息消费者组。标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。 + +注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。 + + + +### Offset + +在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。 + +![RocketMQ-Offset](images/Middleware/RocketMQ-Offset.png) + + + +## 核心设计 + +### 消息清理 + +Broker中的消息被消费后不会立即删除,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。默认48小时后会删除不再使用的CommitLog文件: + +- 检查这个文件最后访问时间 +- 判断是否大于过期时间 +- 指定时间删除,默认凌晨4点 + +```java +/** + * {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()} + */ +private boolean isTimeToDelete() { + // when = "04"; + String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); + // 是04点,就返回true + if (UtilAll.isItTimeToDo(when)) { + return true; + } + // 不是04点,返回false + return false; +} + +/** + * {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()} + */ +private void deleteExpiredFiles() { + // isTimeToDelete()这个方法是判断是不是凌晨四点,是的话就执行删除逻辑。 + if (isTimeToDelete()) { + // 默认是72,但是broker配置文件默认改成了48,所以新版本都是48。 + long fileReservedTime = 48 * 60 * 60 * 1000; + deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx); + } +} + +/** + * {@link org.apache.rocketmq.store.CommitLog#deleteExpiredFile()} + */ +public int deleteExpiredFile(xxx) { + // 这个方法的主逻辑就是遍历查找最后更改时间+过期时间,小于当前系统时间的话就删了(也就是小于48小时)。 + return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx); +} +``` + + + +### push or pull + +RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是**长轮询机制**,即拉取方式。Broker端属性 `longPollingEnable` 标记是否开启长轮询,默认开启。源码如下: + +```java +// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()} + +// 拉取消息,结果放到pullCallback里 +this.pullAPIWrapper.pullKernelImpl(pullCallback); +``` + +**为什么要主动拉取消息而不使用事件监听方式?** + +事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。 + + + +### 负载均衡 + +RocketMQ通过Topic在多Broker中分布式存储实现。 + +#### Producer端 + +发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡 + +- 提升写入吞吐量,当多个producer同时向一个broker写入数据的时候,性能会下降 +- 消息分布在多broker中,为负载消费做准备 + + + +**默认策略是随机选择:** + +- producer维护一个index +- 每次取节点会自增 +- index向所有broker个数取余 +- 自带容错策略 + + + +**其他实现:** + +- SelectMessageQueueByHash + +- - hash的是传入的args + +- SelectMessageQueueByRandom + +- SelectMessageQueueByMachineRoom 没有实现 + +也可以自定义实现**MessageQueueSelector**接口中的select方法 + +```java +MessageQueue select(final List mqs, final Message msg, final Object arg); +``` + + + +#### Consumer端 + +采用的是平均分配算法来进行负载均衡。 + +**其他负载均衡算法** + +- 平均分配策略(默认)(AllocateMessageQueueAveragely) +- 环形分配策略(AllocateMessageQueueAveragelyByCircle) +- 手动配置分配策略(AllocateMessageQueueByConfig) +- 机房分配策略(AllocateMessageQueueByMachineRoom) +- 一致性哈希分配策略(AllocateMessageQueueConsistentHash) +- 靠近机房策略(AllocateMachineRoomNearby) + + + +**当消费负载均衡Consumer和Queue不对等的时候会发生什么?** + +Consumer和Queue会优先平均分配,如果Consumer少于Queue的个数,则会存在部分Consumer消费多个Queue的情况,如果Consumer等于Queue的个数,那就是一个Consumer消费一个Queue,如果Consumer个数大于Queue的个数,那么会有部分Consumer空余出来,白白的浪费了。 @@ -1033,148 +1280,210 @@ MQ与DB一致性原理(两方事务) -## 常见问题 +### 事务消息 -**消息可靠性怎么保证?** +MQ与DB一致性原理(两方事务) -消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。 +![MQ与DB一致性原理](images/Middleware/MQ与DB一致性原理.png) -- **生产者丢失** +事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。实现原理如下: - **产生原因**:可能因为程序发送失败抛异常而没有重试处理,或发送过程成功但过程中网络闪断MQ没收到 +- 生产者先发送一条半事务消息到MQ +- MQ收到消息后返回ack确认 +- 生产者开始执行本地事务 +- 如果事务执行成功发送commit到MQ,失败发送rollback +- 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查 +- 生产者查询事务执行最终状态 +- 根据查询事务状态再次提交二次确认 - **解决方案**: +如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。 - - 发送异常:本地消息表 - - 发送成功无回调:异步发送+回调通知+本地消息表 +![RocketMQ事务消息](images/Middleware/RocketMQ事务消息.jpg) -- **MQ丢失** - **产生原因**:如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失 - **解决方案**:RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘。可以修改配置为同步刷盘来提高消息可靠性,但会对性能有损耗,需权衡 +## 保证顺序 -- **消费者丢失** +RocketMQ的消息是存储到Topic的Queue里面的,Queue本身是FIFO(First Int First Out)先进先出队列。所以单个Queue是可以保证有序性的。 - **产生原因**:消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失 +顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成: - **解决方案**:消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理 +- **顺序发布** +- **顺序消费** -**如果一直消费失败导致消息积压怎么处理?** +顺序消息包含两种类型: -因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑: +- **分区顺序**:一个Partition内所有的消息按照先进先出的顺序进行发布和消费 +- **全局顺序**:一个Topic内所有的消息按照先进先出的顺序进行发布和消费 -- 消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费 -- 如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息 -- 处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状 -![RocketMQ消费失败消息积压](images/Middleware/RocketMQ消费失败消息积压.jpg) +![img](images/Middleware/471426-20180519131211273-554395305.png) +对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序): -**RocketMQ实现原理?** +- 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去 +- 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证 -RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成,它的架构原理是这样的: -- Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳 -- Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息 -- Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费 -![RocketMQ实现原理](images/Middleware/RocketMQ实现原理.jpg) +### 保持顺序发送 +消息被发送时保持顺序。 -**为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?** -- 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计 -- 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的 -- 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的 -- 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响 +### 保持顺序发送存储 +消息被存储时保持和发送的顺序一致。 -**Broker是怎么保存数据的呢?** -RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。 +### 保持顺序消费 -Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。 +消息被消费时保持和存储的顺序一致。 -![Broker数据结构](images/Middleware/Broker数据结构.jpg) +#### MQPullConsumer +MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。 -CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,可以根据文件名很明显看出来偏移量,每个文件默认1G,写满后自动生成一个新的文件。 -由于同一个topic的消息并不是连续的存储在commitlog中,消费者如果直接从commitlog获取消息效率非常低,所以通过consumequeue保存commitlog中消息的偏移量的物理地址,这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件,然后根据一定的规则(offset和文件大小取模)在commitlog中快速定位。 +#### MQPushConsumer +对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。 -**Master 和 Slave 之间是怎么同步数据的呢?** -而消息在master和slave之间的同步是根据raft协议来进行的: -- 在broker收到消息后,会被标记为uncommitted状态 -- 然后会把消息发送给所有的slave -- slave在收到消息之后返回ack响应给master -- master在收到超过半数的ack之后,把消息标记为committed -- 发送committed消息给所有slave,slave也修改状态为committed +## 消息不丢失 +一条消息从生产到被消费,将会经历三个阶段: +![Rocket消息丢失](images/Middleware/Rocket消息丢失.jpg) -**RocketMQ 为什么速度快吗?** +- 生产阶段:Producer 新建消息,然后通过网络将消息投递给 MQ Broker +- 存储阶段:消息将会存储在 Broker 端磁盘中 +- 消息阶段:Consumer 将会从 Broker 拉取消息 -是因为使用了顺序存储、Page Cache和异步刷盘。 +以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。 -- 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多 -- 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache -- 最后由操作系统异步将缓存中的数据刷到磁盘 +### 生产阶段 -**什么是事务、半事务消息?怎么实现的?** +Producer通过网络将消息发送给Broker,这个发送可能会发生丢失,比如网络延迟不可达等。 -事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。实现原理如下: +失败会自动重试,即使重试N次也不行后,那客户端也会知道消息没成功,这也可以自己补偿等,不会盲目影响到主业务逻辑。再比如即使Broker挂了,那还有其他Broker再提供服务了,高可用,不影响。 -- 生产者先发送一条半事务消息到MQ -- MQ收到消息后返回ack确认 -- 生产者开始执行本地事务 -- 如果事务执行成功发送commit到MQ,失败发送rollback -- 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查 -- 生产者查询事务执行最终状态 -- 根据查询事务状态再次提交二次确认 +总结:**同步发送+自动重试机制+多个Master节点** -如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。 -![RocketMQ事务消息](images/Middleware/RocketMQ事务消息.jpg) +#### 同步发送 +有三种send方法,同步发送、异步发送、单向发送,可以采取同步发送的方式进行发送消息。 -## 消息丢失 +- **同步发送**:发消息时会同步阻塞等待broker返回的结果,如果没成功,则不会收到SendResult,这种是最可靠的 +- **异步发送**:在回调方法里可以得知是否发送成功 +- **单向发送(OneWay)**:最不靠谱的一种发送方式,我们无法保证消息真正可达 -**消息发送流程** +```java +/** + * {@link org.apache.rocketmq.client.producer.DefaultMQProducer} + */ -一条消息从生产到被消费,将会经历三个阶段: +// 同步发送 +public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {} +// 异步发送,sendCallback作为回调 +public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {} +// 单向发送,不关心发送结果,最不靠谱 +public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {} +``` -![Rocket消息丢失](images/Middleware/Rocket消息丢失.jpg) -- 生产阶段:Producer 新建消息,然后通过网络将消息投递给 MQ Broker -- 存储阶段:消息将会存储在 Broker 端磁盘中 -- 消息阶段:Consumer 将会从 Broker 拉取消息 -以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。 +#### 失败重试 +发送消息如果失败或者超时了,则会自动重试。默认是重试3次,可以根据api进行更改,比如改为10次: +```java +producer.setRetryTimesWhenSendFailed(10); +``` -### 生产阶段 +底层源码逻辑如下: + +```java +/** + * {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)} + */ + +// 自动重试次数,this.defaultMQProducer.getRetryTimesWhenSendFailed()默认为2,如果是同步发送,默认重试3次,否则重试1次 +int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; +int times = 0; +for (; times < timesTotal; times++) { + // 选择发送的消息queue + MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + if (mqSelected != null) { + try { + // 真正的发送逻辑,sendKernelImpl。 + sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); + switch (communicationMode) { + case ASYNC: + return null; + case ONEWAY: + return null; + case SYNC: + // 如果发送失败了,则continue,意味着还会再次进入for,继续重试发送 + if (sendResult.getSendStatus() != SendStatus.SEND_OK) { + if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { + continue; + } + } + // 发送成功的话,将发送结果返回给调用者 + return sendResult; + default: + break; + } + } catch (RemotingException e) { + continue; + } catch (...) { + continue; + } + } +} +``` -生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。 +#### 故障切换 -### Broker 存储阶段 +假设Broker宕机了,但是生产环境一般都是多M多S的,所以还会有其他Master节点继续提供服务,这也不会影响到我们发送消息,我们消息依然可达。因为比如恰巧发送到broker的时候,broker宕机了,producer收到broker的响应发送失败了,这时候producer会自动重试,这时候宕机的broker就被踢下线了, 所以producer会换一台broker发送消息。 -默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息**存储磁盘成功**,才会返回响应。修改 Broker 端配置如下: + + +### Broker存储阶段 + +若想很严格的保证Broker存储消息阶段消息不丢失,则需要如下配置,但是性能肯定远差于默认配置: + +```properties +# master 节点配置 +flushDiskType = SYNC_FLUSH +brokerRole=SYNC_MASTER + +# slave 节点配置 +brokerRole=slave +flushDiskType = SYNC_FLUSH +``` + + + +#### 设置Broker同步刷盘策略 + +**设置Broker同步刷盘策略**。默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器断电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。 + +若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息**存储磁盘成功**,才会返回响应。修改 Broker 端配置如下: ```properties # 默认情况为 ASYNC_FLUSH @@ -1185,9 +1494,103 @@ flushDiskType = SYNC_FLUSH +#### 等待Master和Slave刷盘完 + +等待Master和Slave刷盘完。即使Broker设置了同步刷盘策略,但是Broker刷完盘后磁盘坏了,这会导致盘上的消息全丢了。但是如果即使是1主1从了,但是Master刷完盘后还没来得及同步给Slave就磁盘坏了,这会导致盘上的消息全丢了。所以我们还可以配置不仅是等Master刷完盘就通知Producer,而是等Master和Slave都刷完盘后才去通知Producer说消息ok了。 + +```properties +# 默认为 ASYNC_MASTER +brokerRole=SYNC_MASTER +``` + + + ### 消费阶段 -消费者从broker拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回`ConsumeConcurrentlyStatus. CONSUME_SUCCESS` 状态给Broker。如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。 +消费失败了其实也是消息丢失的一种变体。 + +只有当消费模式为 **MessageModel.CLUSTERING(集群模式)** 时,Broker 才会自动进行重试,对于广播消息是不会重试的。对于一直无法消费成功的消息,RocketMQ 会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对该死信消息业务做人工的补偿操作。 + +#### 手动ACK确认 + +消费者会先把消息拉取到本地,然后进行业务逻辑,业务逻辑完成后手动进行ack确认,这时候才会真正的代表消费完成。而不是说pull到本地后消息就算消费完了。举个例子 + +```java + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + try{ + for (MessageExt msg : msgs) { + String str = new String(msg.getBody()); + System.out.println(str); + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } catch(Throwable t){ + log.error("消费异常:{}", msgs, t); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + }); +``` + + + +#### 消费异常自动重试 + +- 业务消费方返回 ConsumeConcurrentlyStatus.RECONSUME_LATER +- 业务消费方返回 null +- 业务消费方主动/被动抛出异常 + +针对以上3种情况下,Broker一般会进行重试(默认最大重试16次),RocketMQ 采用了“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小。我们可以在 RocketMQ 的 `broker.conf` 配置文件中配置 Consumer 侧重试次数及时间间隔(**距离第1次发送的时间间隔**), 配置如下: + +```properties + messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h +``` + +消费者客户端,首先判断消费端有没有显式设置最大重试次数 MaxReconsumeTimes, 如果没有,则设置默认重试次数为 16,否则以设置的最大重试次数为准。 + +```java +private int getMaxReconsumeTimes() { + // default reconsume times: 16 + if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { + return 16; + } else { + return this.defaultMQPushConsumer.getMaxReconsumeTimes(); + } +} +``` + + + +#### 消费超时无线重试 + +如果是消费超时情况,MQ会无限制的发送给消费端。这种情况就是Consumer端没有返回`ConsumeConcurrentlyStatus. CONSUME_SUCCESS`,也没有返回`ConsumeConcurrentlyStatus.RECONSUME_LATER`。 + + + +#### 死信队列 + +死信的处理逻辑: + +- 首先判断消息当前重试次数是否大于等于 16,或者消息延迟级别是否小于 0 +- 只要满足上述的任意一个条件,设置新的 topic(死信 topic)为:**%DLQ%+consumerGroup** +- 进行前置属性的添加 +- 将死信消息投递到上述步骤 2 建立的死信 topic 对应的死信队列中并落盘,使消息持久化 + +最后单独启动一个死信队列的消费者进行消费,然后进行人工干预处理失败的消息。 + + + +## 消息幂等 + +在所有消息系统中消费消息有三种模式:`at-most-once`(最多一次)、`at-least-once`(最少一次)和 `exactly-only-once`(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。 + +- `at-most-once`:消息投递后不论消息是否被消费成功,不会再重复投递,有可能会导致消息未被消费,RocketMQ 未使用该方式 +- `at-lease-once`:消息投递后,消费完成后,向服务器返回 ACK,没有消费则一定不会返回 ACK 消息。由于网络异常、客户端重启等原因,服务器未能收到客户端返回的 ACK,服务器则会再次投递,这就会导致可能重复消费,RocketMQ 通过 ACK 来确保消息至少被消费一次 +- `exactly-only-once`:在分布式系统环境下,如果要实现该模式,巨大的开销不可避免。RocketMQ 没有保证此特性,无法避免消息重复,由业务上进行幂等性处理。必须下面两个条件都满足,才能认为消息是"Exactly Only Once": + - 发送消息阶段,不允许发送重复消息 + - 消费消息阶段,不允许消费重复的消息 diff --git a/Solution.md b/Solution.md index a25b3ce..f6e3cb9 100644 --- a/Solution.md +++ b/Solution.md @@ -4309,7 +4309,7 @@ BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,理 -## 分一致性算法 +## 弱一致性算法 ### Gossip协议 diff --git a/images/Middleware/1090617-20190626233829426-1023022108.png b/images/Middleware/1090617-20190626233829426-1023022108.png new file mode 100644 index 0000000..60dd6f3 Binary files /dev/null and b/images/Middleware/1090617-20190626233829426-1023022108.png differ diff --git a/images/Middleware/2ee181849237d310d0dc47fc1c684192 b/images/Middleware/2ee181849237d310d0dc47fc1c684192 new file mode 100644 index 0000000..e5bf396 Binary files /dev/null and b/images/Middleware/2ee181849237d310d0dc47fc1c684192 differ diff --git a/images/Middleware/471426-20180519131211273-554395305.png b/images/Middleware/471426-20180519131211273-554395305.png new file mode 100644 index 0000000..301b2e3 Binary files /dev/null and b/images/Middleware/471426-20180519131211273-554395305.png differ diff --git a/images/Middleware/RocketMQ-Offset.png b/images/Middleware/RocketMQ-Offset.png new file mode 100644 index 0000000..e5bf396 Binary files /dev/null and b/images/Middleware/RocketMQ-Offset.png differ