From 711817799d5d310841942835b1fe37dbdd13abc7 Mon Sep 17 00:00:00 2001 From: Oliver Date: Tue, 10 May 2022 21:31:03 +0800 Subject: [PATCH] Add consumer start detail (#116) * feat: add Consumer start detail --- README.md | 1 + docs/rocketmq/rocketmq-consumer-start.md | 297 +++++++++++++++++++++++ 2 files changed, 298 insertions(+) create mode 100644 docs/rocketmq/rocketmq-consumer-start.md diff --git a/README.md b/README.md index 9609fd2..fb964ac 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,7 @@ - [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md) - [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md) - [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md) +- [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md) ## 番外篇(JDK 1.8) diff --git a/docs/rocketmq/rocketmq-consumer-start.md b/docs/rocketmq/rocketmq-consumer-start.md new file mode 100644 index 0000000..749e26a --- /dev/null +++ b/docs/rocketmq/rocketmq-consumer-start.md @@ -0,0 +1,297 @@ +# RocketMQ 消费者启动流程 + +org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start + +`1、检查配置信息` + +org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig + +校验消费组的长度不能大于 255 + +`public static final int CHARACTER_MAX_LENGTH = 255;` + +```java +if (group.length() >CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified group is longer than group max length 255.", null); +} +``` + +消费组名称只能包含数字、字母、%、-、_、| + +```java +// regex: ^[%|a-zA-Z0-9_-]+$ +// % +VALID_CHAR_BIT_MAP['%'] = true; +// - +VALID_CHAR_BIT_MAP['-'] = true; +// _ +VALID_CHAR_BIT_MAP['_'] = true; +// | +VALID_CHAR_BIT_MAP['|'] = true; +for (int i = 0; i = '0' && i <= '9') { + // 0-9 + VALID_CHAR_BIT_MAP[i] = true; + } else if (i >= 'A' && i <= 'Z') { + // A-Z + VALID_CHAR_BIT_MAP[i] = true; + } else if (i >= 'a' && i <= 'z') { + // a-z + VALID_CHAR_BIT_MAP[i] = true; + } +} +``` + +```java +public static boolean isTopicOrGroupIllegal(String str) { + int strLen = str.length(); + int len =VALID_CHAR_BIT_MAP.length; + boolean[] bitMap =VALID_CHAR_BIT_MAP; + for (int i = 0; i < strLen; i++) { + char ch = str.charAt(i); + if (ch >= len || !bitMap[ch]) { + return true; + } + } + return false; +} +``` + +消费组名称不能是`DEFAULT_CONSUMER` + +`public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";` + +```java +if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException("consumerGroup can not equal " + MixAll.DEFAULT_CONSUMER_GROUP ++ ", please specify another one." + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); +} +``` + +消费者最小线程数需要在 1-1000 之间 + +```java +if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 + || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { + throw new MQClientException("consumeThreadMin Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); +} +``` + +消费者最大线程数需要在 1-1000 之间 + +```java +if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { + throw new MQClientException("consumeThreadMax Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); +} +``` + +`2、设置订阅信息` + +构造主题订阅消息`SubscriptionData`并将其加入`RebalanceImpl`,如果是消费模式是集群,订阅默认的重试主题并且构造`SubscriptionData`加入`RebalanceImpl` + +```java +private void copySubscription() throws MQClientException { + try { + Map sub = this.defaultMQPushConsumer.getSubscription(); + if (sub != null) { + for (final Map.Entry entry : sub.entrySet()) { + final String topic = entry.getKey(); + final String subString = entry.getValue(); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + } + } + + if (null == this.messageListenerInner) { + this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); + } + + switch (this.defaultMQPushConsumer.getMessageModel()) { + caseBROADCASTING: + break; + caseCLUSTERING: + final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); + this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); + break; + default: + break; + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } +} +``` + +`3、初始化MqClientInstance、RebalanceImpl、PullApiWrapper` + +创建`MqClientInstance`, 无论在生产者端还是消费者端都是一个很重要的类, 封装了Topic信息、broker信息,当然还有生产者和消费者的信息。 + +```java +public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + String clientId = clientConfig.buildMQClientId(); + MQClientInstance instance = this.factoryTable.get(clientId); + if (null == instance) { + instance = new MQClientInstance(clientConfig.cloneClientConfig(), + this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); + MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); + if (prev != null) { + instance = prev; + log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); + } else { + log.info("Created new MQClientInstance for clientId:[{}]", clientId); + } + } + + return instance; +} +``` + +构造`RebalanceImpl` 用来负载消费者与队列的消费关系 + +```java +this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); +this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); +this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); +this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); +``` + +构造`PullApiWrapper` 消费者拉取消息类 + +```java +this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); +this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); +``` + +`4、设置消息偏移量` + +如果是广播模式消费,消息消费进度存储在消费端,如果是集群模式消费,消息消费进度存储在 broker 端 + +```java +if (this.defaultMQPushConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); +} else { + switch (this.defaultMQPushConsumer.getMessageModel()) { + caseBROADCASTING: + this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + caseCLUSTERING: + this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + default: + break; + } + this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); +} +this.offsetStore.load(); +``` + +`5、是否是顺序消费` + +根据是否是顺序消费构造不同的`ConsumeMessageService` + +```java +if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { + this.consumeOrderly = true; + this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); +} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { + this.consumeOrderly = false; + this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); +} +``` + +区别在于启动的线程任务不同: + +顺序消费线程: + +```java +if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + ConsumeMessageOrderlyService.this.lockMQPeriodically(); + } catch (Throwable e) { + log.error("scheduleAtFixedRate lockMQPeriodically exception", e); + } + } + }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); +} +``` + +正常消费线程: + +```java +this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + cleanExpireMsg(); + } catch (Throwable e) { + log.error("scheduleAtFixedRate cleanExpireMsg exception", e); + } + } + +}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); +``` + +6`、启动MQClientInstance` + +消费者与生产者共用 MQClientInstance + +大部分流程已经在生产者启动流程中讲解,这里主要讲解与生产者不同的部分 + +启动保证消费者偏移量最终一致性的任务 + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.persistAllConsumerOffset(); + } catch (Exception e) { + log.error("ScheduledTask persistAllConsumerOffset exception", e); + } + } +}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); +``` + +启动调整线程池大小任务: + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.adjustThreadPool(); + } catch (Exception e) { + log.error("ScheduledTask adjustThreadPool exception", e); + } + } +}, 1, 1, TimeUnit.MINUTES); +``` + +启动重平衡服务: + +`this.rebalanceService.start();` + +7`、更新订阅主题信息` + +更新主题订阅信息: + +```java +private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map subTable = this.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + } + } +} +```