From 3981ce0b4870c1b54c2477b47e7aeb3f627d9583 Mon Sep 17 00:00:00 2001 From: Oliver Date: Fri, 20 May 2022 22:38:02 +0800 Subject: [PATCH] feat: Pull message process (#121) * feat: add pull message process --- README.md | 1 + .../rocketmq-pullmessage-processor.md | 116 ++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 docs/rocketmq/rocketmq-pullmessage-processor.md diff --git a/README.md b/README.md index 189ba61..2bf8391 100644 --- a/README.md +++ b/README.md @@ -283,6 +283,7 @@ - [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md) - [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md) - [RocketMQ 消息拉取流程](docs/rocketmq/rocketmq-pullmessage.md) +- [RocketMQ Broker 处理拉取消息请求流程](docs/rocketmq/rocketmq-pullmessage-processor.md) ## 番外篇(JDK 1.8) diff --git a/docs/rocketmq/rocketmq-pullmessage-processor.md b/docs/rocketmq/rocketmq-pullmessage-processor.md new file mode 100644 index 0000000..f6f2e00 --- /dev/null +++ b/docs/rocketmq/rocketmq-pullmessage-processor.md @@ -0,0 +1,116 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ broker 处理拉取消息请求流程 + +org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand) + +第 1 步、`校验broker是否可读` + +```java +if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); + return response; +} +``` + +第 2 步、`根据消费组获取订阅信息` + +```java +SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); +``` + +第 3 步、`校验是否允许消费` + +```java +if (!subscriptionGroupConfig.isConsumeEnable()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); + return response; +} +``` + +第 4 步、`根据主题获取对应的配置信息` + +```java +TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); +if (null == topicConfig) { + log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); + return response; +} +``` + +第 5 步、`校验主题对应的队列` + +```java +if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { + String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); + log.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + return response; +} +``` + +第 6 步、`如果配置了消息过滤表达式,根据表达式进行构建consumerFilterData,如果没有,则根据主题构建` + +```java +consumerFilterData = ConsumerFilterManager.build( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), + requestHeader.getExpressionType(), requestHeader.getSubVersion() + +consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), + requestHeader.getConsumerGroup()); +``` + +第 7 步、`校验如果不是Tag过滤,是否开启了自定义属性过滤,如果没有开启,不允许操作 只有使用push推送模式的消费者才能用使用SQL92标准的sql语句,pull拉取模式的消费者是不支持这个功能的。` + +```java +if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) + && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); + return response; +} +``` + +第 8 步、`根据是否支持重试过滤创建不同的MessageFilter` + +```java +if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { + messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, + this.brokerController.getConsumerFilterManager()); +} else { + messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, + this.brokerController.getConsumerFilterManager()); +} +``` + +第 9 步、`根据消费组、主题、队列、偏移量、最大拉取消息数量、消息过滤器查找信息` + +```java +final GetMessageResult getMessageResult = + this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); + +``` + +第 10 步、`消息为空 设置code为系统错误 返回response` + +```java +response.setCode(ResponseCode.SYSTEM_ERROR); +response.setRemark("store getMessage return null"); +``` + +第 11 步、`提交偏移量` + +```java +if (storeOffsetEnable) { + this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); +} +```