diff --git a/docs/rocketmq/rocketmq-send-message.md b/docs/rocketmq/rocketmq-send-message.md index 2f5a461..21a55f9 100644 --- a/docs/rocketmq/rocketmq-send-message.md +++ b/docs/rocketmq/rocketmq-send-message.md @@ -1,16 +1,18 @@ +# RocketMQ 消息发送流程 + 这里以同步发送为示例讲解: 入口: org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) -消息发送 默认超时时间3秒 +消息发送 默认超时时间 3 秒 第一步:验证 -主题的长度不能大于127,消息的大小不能大于4M +主题的长度不能大于 127,消息的大小不能大于 4M -``` +```java public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); @@ -37,9 +39,9 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer 第二步:查找路由信息 -如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从NameServer中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常 +如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常 -``` +```java private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { @@ -59,15 +61,13 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { ``` -从NameServer查询路由信息方法: +从 NameServer 查询路由信息方法: org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer) 1、如果是默认的主题查询路由信息,返回成功,更新读队列和写队列的个数为默认的队列个数 - - -``` +```java if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), clientConfig.getMqClientApiTimeout()); @@ -81,9 +81,9 @@ if (isDefault && defaultMQProducer != null) { } ``` -2、返回路由信息之后,与本地缓存的路由信息比对,判断路由信息是否发生变化,如果发生变化更新broker地址缓存,更新`topicPublishInfoTable`,更新topic路由信息缓存`topicRouteTable` +2、返回路由信息之后,与本地缓存的路由信息比对,判断路由信息是否发生变化,如果发生变化更新 broker 地址缓存,更新`topicPublishInfoTable`,更新 topic 路由信息缓存`topicRouteTable` -``` +```java if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); @@ -118,9 +118,9 @@ if (changed) { `MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);` -首先判断是否启用故障延迟机制 ,默认不启用,第一次查询lastBrokerName为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的broker +首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的 broker -``` +```java public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); @@ -144,7 +144,7 @@ public MessageQueue selectOneMessageQueue(final String lastBrokerName) { 轮询获取队列 ,如果可用直接返回 -``` +```java for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) @@ -155,9 +155,9 @@ for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { } ``` -判断是否可用逻辑:先从要规避的broker集合`faultItemTable`中获取该broker是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该broker失败的时间 + 规避的时间,如果该broker在规避的broker集合中不存在,直接返回可用 +判断是否可用逻辑:先从要规避的 broker 集合`faultItemTable`中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用 -``` +```java public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { @@ -167,9 +167,9 @@ public boolean isAvailable(final String name) { } ``` -如果没有可用的broker,尝试从 规避的broker集合中选择一个可用的broker,如果选择的broker没有写队列,则从规避的 broker列表中移除该broker +如果没有可用的 broker,尝试从 规避的 broker 集合中选择一个可用的 broker,如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker -``` +```java final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { @@ -186,9 +186,9 @@ if (writeQueueNums > 0) { P.S. : -要规避的broker集合在同步发送的时候不会 更新,在异步发送的时候会更新 +要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新 -``` +```java public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); @@ -199,7 +199,7 @@ public void updateFaultItem(final String brokerName, final long currentLatency, 主要更新消息发送故障的延迟时间`currentLatency`和故障规避的 开始时间`startTimestamp` -``` +```java public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { @@ -221,23 +221,23 @@ public void updateFaultItem(final String name, final long currentLatency, final 总结: -不管开不开启故障延迟机制,都可以规避故障的broker,只是开启故障延迟机制,会在一段时间内都不会访问到该broker,而不开启只是下一次不会访问到该broker +不管开不开启故障延迟机制,都可以规避故障的 broker,只是开启故障延迟机制,会在一段时间内都不会访问到该 broker,而不开启只是下一次不会访问到该 broker 第四步:消息发送 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl -1、为消息分配全局唯一id +1、为消息分配全局唯一 id -``` +```java if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } ``` -2、消息体大于4k启用压缩 +2、消息体大于 4k 启用压缩 -``` +```java boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; @@ -247,7 +247,7 @@ if (this.tryToCompressMessage(msg)) { 3、如果是事务消息,设置消息类型为事务消息 -``` +```java final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; @@ -256,7 +256,7 @@ if (Boolean.parseBoolean(tranMsg)) { 4、校验是否超时 -``` +```java long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); @@ -265,7 +265,7 @@ if (timeout < costTimeSync) { 5、组装请求头 -``` +```java SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); @@ -296,7 +296,7 @@ if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 6、发送请求 -``` +```java caseSYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { @@ -318,7 +318,7 @@ caseSYNC: 1、处理状态码 -``` +```java switch (response.getCode()) { case ResponseCode.FLUSH_DISK_TIMEOUT: { sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; @@ -342,9 +342,9 @@ switch (response.getCode()) { } ``` -2、构造SendResult +2、构造 SendResult -``` +```java SendResult sendResult = new SendResult(sendStatus, uniqMsgId, responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); @@ -360,4 +360,4 @@ if (traceOn != null && traceOn.equals("false")) { sendResult.setTraceOn(true); } sendResult.setRegionId(regionId); -``` \ No newline at end of file +```