Update rocketmq-send-message.md

pull/110/head
Yang Libin 3 years ago committed by GitHub
parent 7983270e86
commit 6dcec54029
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,5 @@
# RocketMQ 消息发送流程
这里以同步发送为示例讲解: 这里以同步发送为示例讲解:
入口: 入口:
@ -10,7 +12,7 @@ org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.c
主题的长度不能大于 127消息的大小不能大于 4M 主题的长度不能大于 127消息的大小不能大于 4M
``` ```java
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) { if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
@ -39,7 +41,7 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer
如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常 如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常
``` ```java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) { if (null == topicPublishInfo || !topicPublishInfo.ok()) {
@ -65,9 +67,7 @@ org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFro
1、如果是默认的主题查询路由信息返回成功更新读队列和写队列的个数为默认的队列个数 1、如果是默认的主题查询路由信息返回成功更新读队列和写队列的个数为默认的队列个数
```java
```
if (isDefault && defaultMQProducer != null) { if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout()); clientConfig.getMqClientApiTimeout());
@ -83,7 +83,7 @@ if (isDefault && defaultMQProducer != null) {
2、返回路由信息之后与本地缓存的路由信息比对判断路由信息是否发生变化如果发生变化更新 broker 地址缓存,更新`topicPublishInfoTable`,更新 topic 路由信息缓存`topicRouteTable` 2、返回路由信息之后与本地缓存的路由信息比对判断路由信息是否发生变化如果发生变化更新 broker 地址缓存,更新`topicPublishInfoTable`,更新 topic 路由信息缓存`topicRouteTable`
``` ```java
if (changed) { if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
@ -120,7 +120,7 @@ if (changed) {
首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的 broker 首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的 broker
``` ```java
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) { if (lastBrokerName == null) {
return selectOneMessageQueue(); return selectOneMessageQueue();
@ -144,7 +144,7 @@ public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
轮询获取队列 ,如果可用直接返回 轮询获取队列 ,如果可用直接返回
``` ```java
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0) if (pos < 0)
@ -157,7 +157,7 @@ for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
判断是否可用逻辑:先从要规避的 broker 集合`faultItemTable`中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用 判断是否可用逻辑:先从要规避的 broker 集合`faultItemTable`中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用
``` ```java
public boolean isAvailable(final String name) { public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name); final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) { if (faultItem != null) {
@ -169,7 +169,7 @@ public boolean isAvailable(final String name) {
如果没有可用的 broker尝试从 规避的 broker 集合中选择一个可用的 broker如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker 如果没有可用的 broker尝试从 规避的 broker 集合中选择一个可用的 broker如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker
``` ```java
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) { if (writeQueueNums > 0) {
@ -188,7 +188,7 @@ P.S. :
要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新 要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新
``` ```java
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) { if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
@ -199,7 +199,7 @@ public void updateFaultItem(final String brokerName, final long currentLatency,
主要更新消息发送故障的延迟时间`currentLatency`和故障规避的 开始时间`startTimestamp` 主要更新消息发送故障的延迟时间`currentLatency`和故障规避的 开始时间`startTimestamp`
``` ```java
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name); FaultItem old = this.faultItemTable.get(name);
if (null == old) { if (null == old) {
@ -229,7 +229,7 @@ org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
1、为消息分配全局唯一 id 1、为消息分配全局唯一 id
``` ```java
if (!(msg instanceof MessageBatch)) { if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg); MessageClientIDSetter.setUniqID(msg);
} }
@ -237,7 +237,7 @@ if (!(msg instanceof MessageBatch)) {
2、消息体大于 4k 启用压缩 2、消息体大于 4k 启用压缩
``` ```java
boolean msgBodyCompressed = false; boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) { if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG; sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
@ -247,7 +247,7 @@ if (this.tryToCompressMessage(msg)) {
3、如果是事务消息设置消息类型为事务消息 3、如果是事务消息设置消息类型为事务消息
``` ```java
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) { if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
@ -256,7 +256,7 @@ if (Boolean.parseBoolean(tranMsg)) {
4、校验是否超时 4、校验是否超时
``` ```java
long costTimeSync = System.currentTimeMillis() - beginStartTime; long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) { if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
@ -265,7 +265,7 @@ if (timeout < costTimeSync) {
5、组装请求头 5、组装请求头
``` ```java
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic()); requestHeader.setTopic(msg.getTopic());
@ -296,7 +296,7 @@ if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
6、发送请求 6、发送请求
``` ```java
caseSYNC: caseSYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime; long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) { if (timeout < costTimeSync) {
@ -318,7 +318,7 @@ caseSYNC:
1、处理状态码 1、处理状态码
``` ```java
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT: { case ResponseCode.FLUSH_DISK_TIMEOUT: {
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
@ -344,7 +344,7 @@ switch (response.getCode()) {
2、构造 SendResult 2、构造 SendResult
``` ```java
SendResult sendResult = new SendResult(sendStatus, SendResult sendResult = new SendResult(sendStatus,
uniqMsgId, uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());

Loading…
Cancel
Save