Update rocketmq-nameserver-broker.md

pull/108/head
Yang Libin 3 years ago committed by GitHub
parent 6aa5b147d0
commit 445055ca02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,12 +1,12 @@
nameserver如何与broker进行通信的? # RockerMQ Nameserver 如何与 Broker 进行通信的?
nameserver每隔10s扫描一次Broker移除处于未激活状态的Broker nameserver 每隔 10s 扫描一次 Broker移除处于未激活状态的 Broker
核心代码: 核心代码:
`this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.*SECONDS*);` `this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.*SECONDS*);`
``` ```java
public int scanNotActiveBroker() { public int scanNotActiveBroker() {
int removeCount = 0; int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
@ -27,11 +27,11 @@ public int scanNotActiveBroker() {
} }
``` ```
broker每隔30秒会向集群中所有的NameServer发送心跳包 broker 每隔 30 秒会向集群中所有的 NameServer 发送心跳包
核心代码: 核心代码:
``` ```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
@ -45,7 +45,7 @@ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
``` ```
``` ```java
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
@ -71,19 +71,19 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole
} }
``` ```
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 是网络处理器解析请求类型,如果请求类型为`*RequestCode.REGISTER_BROKER`则请求最终转发到org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker* org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 是网络处理器解析请求类型,如果请求类型为`*RequestCode.REGISTER_BROKER`,则请求最终转发到 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker\*
代码太多,文字来描述一下: 代码太多,文字来描述一下:
第一步路由注册需要加写锁防止并发修改RouteInfoManager中的路由表。首先判断Broker所属集群是否存在如果不存在则创建集群然后将broker名加入集群。 第一步:路由注册需要加写锁,防止并发修改 RouteInfoManager 中的路由表。首先判断 Broker 所属集群是否存在,如果不存在,则创建集群,然后将 broker 名加入集群。
第二步维护BrokerData信息首先从brokerAddrTable中根据 broker名尝试获取Broker信息如果不存在则新建BrokerData放入brokerAddrTableregisterFirst设置为true如果存在直接替换原先的Broker信息registerFirst设置为false表示非第一次注册 第二步:维护 BrokerData 信息,首先从 brokerAddrTable 中根据 broker 名尝试获取 Broker 信息,如果不存在,则新建 BrokerData 放入 brokerAddrTableregisterFirst 设置为 true如果存在直接替换原先的 Broker 信息registerFirst 设置为 false表示非第一次注册
第三步如果Broker为主节点并且Broker的topic配置信息发生变化或者是初次注册则需要创建或者更新topic的路由元数据并填充topicQueueTable 第三步:如果 Broker 为主节点,并且 Broker topic 配置信息发生变化或者是初次注册,则需要创建或者更新 topic 的路由元数据,并填充 topicQueueTable
根据topicConfig创建QueueData数据结构然后更新topicQueueTable 根据 topicConfig 创建 QueueData 数据结构然后更新 topicQueueTable
``` ```java
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData(); QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName); queueData.setBrokerName(brokerName);
@ -108,9 +108,9 @@ private void createAndUpdateQueueData(final String brokerName, final TopicConfig
} }
``` ```
第四步更新BrokerLiveInfo,存储状态正常的Broker信息表BrokerLiveInfo是执行路由删除操作的重要依据。 第四步:更新 BrokerLiveInfo,存储状态正常的 Broker 信息表BrokerLiveInfo 是执行路由删除操作的重要依据。
``` ```java
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo( new BrokerLiveInfo(
System.currentTimeMillis(), System.currentTimeMillis(),
@ -119,9 +119,9 @@ BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
haServerAddr)); haServerAddr));
``` ```
第五步注册Broker的过滤器Server地址列表一个Broker上会关联多个 FilterServer消息过滤服务器。如果此Broker为从节点则需要查找该Broker的主节点信息并更新对应的masterAddr属性 第五步:注册 Broker 的过滤器 Server 地址列表,一个 Broker 上会关联多个 FilterServer 消息过滤服务器。如果此 Broker 为从节点,则需要查找该 Broker 的主节点信息,并更新对应的 masterAddr 属性
``` ```java
if (MixAll.MASTER_ID!= brokerId) { if (MixAll.MASTER_ID!= brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) { if (masterAddr != null) {
@ -136,28 +136,28 @@ if (MixAll.MASTER_ID!= brokerId) {
总结: 总结:
NameServer与Broker保持长连接Broker的状态信息存储在BrokerLiveTable中NameServer每收到一个心跳包将更新brokerLiveTable中关于broker的状态信息以及路由表topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。更新上述路由表使用了锁粒度较少的读写锁允许多个消息发送者并发读操作保证消息发送时的高并发同一时刻NameServer只处理一个Broker心跳包多个心跳包请求串行执行。 NameServer Broker 保持长连接Broker 的状态信息存储在 BrokerLiveTable NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 broker 的状态信息以及路由表topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。更新上述路由表使用了锁粒度较少的读写锁允许多个消息发送者并发读操作保证消息发送时的高并发同一时刻 NameServer 只处理一个 Broker 心跳包,多个心跳包请求串行执行。
NameServer如何剔除失效的Broker NameServer 如何剔除失效的 Broker
1、NameServer每隔十秒注册一次brokerLiveTable状态表如果BrokerLive的lastUpdateTimestamp 1、NameServer 每隔十秒注册一次 brokerLiveTable 状态表,如果 BrokerLive lastUpdateTimestamp
时间戳距当前时间超过120秒则认为Broker失效移除该Broker关闭与broker的连接同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。 时间戳距当前时间超过 120 秒,则认为 Broker 失效,移除该 Broker关闭与 broker 的连接,同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
2、如果broker在正常关闭的情况下会发送unRegisterBroker指令。 2、如果 broker 在正常关闭的情况下,会发送 unRegisterBroker 指令。
不管是哪一种方式触发的路由删除,处理逻辑是一样的 不管是哪一种方式触发的路由删除,处理逻辑是一样的
第一步申请写锁移除brokerLiveTable、filterServerTable中Broker相关的信息 第一步:申请写锁,移除 brokerLiveTable、filterServerTable Broker 相关的信息
``` ```java
this.lock.writeLock().lockInterruptibly(); this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr); log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr);
this.filterServerTable.remove(brokerAddr); this.filterServerTable.remove(brokerAddr);
``` ```
第二步维护brokerAddrTable找到具体的broker将其从brokerData中移除如果移除之后不再包含其他broker则在brokerAddrtable移除该brokerName对应的数据 第二步:维护 brokerAddrTable找到具体的 broker将其从 brokerData 中移除,如果移除之后不再包含其他 broker则在 brokerAddrtable 移除该 brokerName 对应的数据
``` ```
BrokerData brokerData = this.brokerAddrTable.get(brokerName); BrokerData brokerData = this.brokerAddrTable.get(brokerName);
@ -173,7 +173,7 @@ if (null != brokerData) {
} }
``` ```
第三步根据brokerName从clusterAddrTable中找到Broker并将其中集群中移除如果移除后集群中不包含任何Broker则将该集群从clusterAddrTable中移除 第三步:根据 brokerName clusterAddrTable 中找到 Broker 并将其中集群中移除,如果移除后集群中不包含任何 Broker则将该集群从 clusterAddrTable 中移除
``` ```
if (removeBrokerName) { if (removeBrokerName) {
@ -191,7 +191,7 @@ if (removeBrokerName) {
} }
``` ```
第四步: 根据brokerName遍历所有主题的队列如果队列中包含当前broker的队列则移除如果topic中包含待移除的Broker的队列从路由表中删除该topic 第四步: 根据 brokerName遍历所有主题的队列如果队列中包含当前 broker 的队列,则移除,如果 topic 中包含待移除的 Broker 的队列,从路由表中删除该 topic
``` ```
this.topicQueueTable.forEach((topic, queueDataMap) -> { this.topicQueueTable.forEach((topic, queueDataMap) -> {

Loading…
Cancel
Save