diff --git a/.github/workflows/compress.yml b/.github/workflows/compress.yml index 9ce8527..42f73ad 100644 --- a/.github/workflows/compress.yml +++ b/.github/workflows/compress.yml @@ -18,16 +18,16 @@ jobs: uses: actions/checkout@v2 - name: Compress Images - uses: calibreapp/image-actions@master + uses: calibreapp/image-actions@main with: githubToken: ${{ secrets.GITHUB_TOKEN }} compressOnly: true - name: Commit Files run: | - git config --local user.email "action@github.com" - git config --local user.name "GitHub Action" - git commit -m "[Automated] Optimize images" -a + git config --local user.email "szuyanglb@outlook.com" + git config --local user.name "yanglbme" + git commit -m "chore: auto compress images" -a - name: Push Changes uses: ad-m/github-push-action@master diff --git a/README.md b/README.md index 779f82d..01835a5 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,9 @@ 本项目主要用于记录框架及中间件源码的阅读经验、个人理解及解析,希望能够使阅读源码变成一件简单有趣,且有价值的事情,抽空更新中... (如果本项目对您有帮助,请 watch、star、fork 素质三连一波,鼓励一下作者,谢谢) -- Netlify: https://schunter.netlify.app -- ~~Gitee Pages: https://doocs.gitee.io/source-code-hunter~~ +- Gitee Pages: https://doocs.gitee.io/source-code-hunter - GitHub Pages: https://doocs.github.io/source-code-hunter -注:😶 Gitee Pages 站点遭 Gitee 官方误判为“包含违禁违规内容”,惨遭下线。 - ## Spring 系列 ### IoC 容器 @@ -60,6 +57,10 @@ - [面筋哥 IoC 容器的一天(上)]() +### Spring 整体脉络 + +- [16 张图解锁 Spring 的整体脉络](/docs/Spring/Spring整体脉络/16张图解锁Spring的整体脉络.md) + ### Spring 类解析 - [Spring 自定义标签解析](/docs/Spring/clazz/Spring-Custom-label-resolution.md) @@ -115,9 +116,10 @@ - [SpringBoot ConditionalOnBean](/docs/SpringBoot/SpringBoot-ConditionalOnBean.md) ### SpringSecurity -- [SpringSecurity请求全过程解析](/docs/SpringSecurity/SpringSecurity请求全过程解析.md) -- [SpringSecurity自定义用户认证](/docs/SpringSecurity/SpringSecurity自定义用户认证.md) -- + +- [SpringSecurity 请求全过程解析](/docs/SpringSecurity/SpringSecurity请求全过程解析.md) +- [SpringSecurity 自定义用户认证](/docs/SpringSecurity/SpringSecurity自定义用户认证.md) + ## MyBatis ### 基础支持层 @@ -269,6 +271,21 @@ - [Sentinel 底层 LongAdder 的计数实现](docs/Sentinel/Sentinel底层LongAdder的计数实现.md) - [Sentinel 限流算法的实现](docs/Sentinel/Sentinel限流算法的实现.md) +## RocketMQ + +- [RocketMQ NameServer 与 Broker 的通信](docs/rocketmq/rocketmq-nameserver-broker.md) +- [RocketMQ 生产者启动流程](docs/rocketmq/rocketmq-producer-start.md) +- [RocketMQ 消息发送流程](docs/rocketmq/rocketmq-send-message.md) +- [RocketMQ 消息发送存储流程](docs/rocketmq/rocketmq-send-store.md) +- [RocketMQ MappedFile内存映射文件详解](docs/rocketmq/rocketmq-mappedfile-detail.md) +- [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md) +- [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md) +- [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md) +- [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md) +- [RocketMQ 消息拉取流程](docs/rocketmq/rocketmq-pullmessage.md) +- [RocketMQ Broker 处理拉取消息请求流程](docs/rocketmq/rocketmq-pullmessage-processor.md) +- [RocketMQ 消息消费流程](docs/rocketmq/rocketmq-consume-message-process.md) + ## 番外篇(JDK 1.8) ### 基础类库 diff --git a/docs/Spring/SpringTransaction/Spring声明式事务处理.md b/docs/Spring/SpringTransaction/Spring声明式事务处理.md index f136c71..a507645 100644 --- a/docs/Spring/SpringTransaction/Spring声明式事务处理.md +++ b/docs/Spring/SpringTransaction/Spring声明式事务处理.md @@ -2,7 +2,7 @@ 在使用 Spring 声明式事务处理 的时候,一种常用的方法是结合 IoC 容器 和 Spring 已有的 TransactionProxyFactoryBean 对事务管理进行配置,比如,可以在这个 TransactionProxyFactoryBean 中为事务方法配置传播行为、并发事务隔离级别等事务处理属性,从而对声明式事务的处理提供指导。具体来说,在对声明式事务处理的原理分析中,声明式事务处理的实现大致可以分为以下几个部分: -- 读取和处理在 IoC 容器 中配置的事务处理属性,并转化为 Spring 事务处理 需要的内部数据结构,这里涉及的类是 TransactionAttributeSourceAdvisor,从名字可以看出,它是一个 AOP 通知器,Spring 使用这个通知器来完成对事务处理属性值的处理。处理的结果是,在 IoC 容器 中配置的事务处理属性信息,会被读入并转化成 TransactionAttribute 表示的数据对象,这个数据对象是 Spring 对事物处理属性值的数据抽象,对这些属性的处理是和 TransactionProxyFactoryBean 拦截下来的事务方法的处理结合起来的。 +- 读取和处理在 IoC 容器 中配置的事务处理属性,并转化为 Spring 事务处理 需要的内部数据结构,这里涉及的类是 TransactionAttributeSourceAdvisor,从名字可以看出,它是一个 AOP 通知器,Spring 使用这个通知器来完成对事务处理属性值的处理。处理的结果是,在 IoC 容器 中配置的事务处理属性信息,会被读入并转化成 TransactionAttribute 表示的数据对象,这个数据对象是 Spring 对事务处理属性值的数据抽象,对这些属性的处理是和 TransactionProxyFactoryBean 拦截下来的事务方法的处理结合起来的。 - Spring 事务处理模块 实现统一的事务处理过程。这个通用的事务处理过程包含处理事务配置属性,以及与线程绑定完成事务处理的过程,Spring 通过 TransactionInfo 和 TransactionStatus 这两个数据对象,在事务处理过程中记录和传递相关执行场景。 - 底层的事务处理实现。对于底层的事务操作,Spring 委托给具体的事务处理器来完成,这些具体的事务处理器,就是在 IoC 容器 中配置声明式事务处理时,配置的 PlatformTransactionManager 的具体实现,比如 DataSourceTransactionManager 和 HibernateTransactionManager 等。 diff --git a/docs/Spring/Spring整体脉络/16张图解锁Spring的整体脉络.md b/docs/Spring/Spring整体脉络/16张图解锁Spring的整体脉络.md new file mode 100644 index 0000000..163d525 --- /dev/null +++ b/docs/Spring/Spring整体脉络/16张图解锁Spring的整体脉络.md @@ -0,0 +1,261 @@ +作者: [Java4ye](https://github.com/Java4ye) + +### 概览 + +本文将讲解 Spring 的原理,看看一个 Bean 是怎么被创建出来的,中间经历过那几道工序加工,它的生命周期是怎样的,以及有哪些扩展点,后置处理器可以使用,让你对 Spring 多一些了解! + +### 目录 + +本文会先大概介绍下这些知识点 👇 + +![image-20211213224509864](../../../images/spring/image-20211213224509864.png) + +### 印象中的 Spring + +脑海中有这么一条公式: + +👉 IOC = 工厂模式 + XML + 反射 + +👉 而 DI , AOP , **事务** 等也都在 XML 中很直观的表现出来 + +虽然我们现在大部分用这个注解来代替,但是原理还是基本一样的 🐖 + +注解使用起来很方便,但是学习的话,还是建议先通过这个 XML ,毕竟结构性的文档,有层次感,可以留下更深的印象~ 😄 + +### 小小 Spring + +把 Spring 浓缩一下,就有了这么一点小东西 🐖 + +![image-20211213224920994](../../../images/spring/image-20211213224920994.png) + +想了下,我们用 Spring ,其中最主要的一点,就是用它来帮我们管理,创建这个 Bean 。 + +那么先从源头看起 —— Bean 从哪来 (@\_@;) + +### Bean 解析流程 + +![image-20211213225044814](../../../images/spring/image-20211213225044814.png) + +如图所示,就是通过 **解析器**,对我们的 XML 文件或者注解进行解析,最后将这些信息封装在 BeanDefinition 类中,并通过 BeanDefinitionRegistry 接口将这些信息 **注册** 起来,放在 beanDefinitionMap 变量中, key : beanName , value :BeanDefinition 。 + +简单看看 BeanDefinition 中的属性叭 + +### BeanDefinition + +- beanClass : bean 的类型 ,实例化时用的 🐖 +- scope : 作用范围有 singleton,prototype + +- isLazy : **懒加载** ,true 的话 会在 getBean 时生成,而且 scope 的 prototype 无效,false 在 Spring 启动过程中直接生成 +- initMethodName : 初始化方法,当然是初始化时调用 🐖 +- primary : 主要的,有多个 Bean 时使用它 +- dependsOn : 依赖的 Bean,必须等依赖 Bean 创建好才可以创建 + +> PS: @Component ,@Bean , 都会被解析成 BeanDefinition + +### 反射 + +有了原料后呢,咋们再来看看这个 **工厂** BeanFactory + +先简单想一想这个工厂要怎么创建这个 Bean 呢? + +没错,肯定就是这个 **反射** 啦 😄 + +那么,结合我们从原料中获取的重要属性之一的 beanClass ,我们可以画出这么一张图 👇 + +![image-20211213225124831](../../../images/spring/image-20211213225124831.png) + +那么我们再来看看这个 BeanFactory 叭 😄 + +### BeanFactory + +先来看看 作为 IOC 容器的**根接口** 的 BeanFactory 提供了什么方法吧 👇 + +![image-20210904162844126](../../../images/spring/image-20210904162844126.png) + +主要是这个 getBean 方法,以及 **别名获取**,**类型获取** 方法和其他一些判断方法如 :**单例**,**多例**,**类型匹配**,**包含 bean** + +我们来简单看看它的子接口都有哪些叭~😄 + +这里分享个小技巧叭 🐖 + +看源码的时候,一般就直接看这个**默认**接口 如这里的 DefaultListableBeanFactory + +![image-20210904161436139](../../../images/spring/image-20210904161436139.png) + +基本上看个类名就知道大概作用了,那么先对号入座下 👇 + +**ListableBeanFactory** + +> 👉 遍历 bean + +**HierarchicalBeanFactory** + +> 👉 提供 父子关系,可以获取上一级的 BeanFactory + +**ConfigurableBeanFactory** + +> 👉 实现了 SingletonBeanRegistry ,主要是 单例 Bean 的注册,生成 + +**AutowireCapableBeanFactory** + +> 👉 和自动装配有关的 + +**AbstractBeanFactory** + +> 👉 单例缓存,以及 FactoryBean 相关的 + +**ConfigurableListableBeanFactory** + +> 👉 预实例化单例 Bean,分析,修改 BeanDefinition + +**AbstractAutowireCapableBeanFactory** + +> 👉 创建 Bean ,属性注入,实例化,调用初始化方法 等等 + +**DefaultListableBeanFactory** + +> 👉 支持单例 Bean ,Bean 别名 ,父子 BeanFactory,Bean 类型转化 ,Bean 后置处理,FactoryBean,自动装配等 + +是不是非常丰富 😄 + +### FactoryBean + +FactoryBean ,它本身就是个 Bean,算是小工厂 ,归 BeanFactory 这个大工厂管理的。 + +![image-20210904174616712](../../../images/spring/image-20210904174616712.png) + +可以看到它就只有三个方法 + +1. `getObject()` 获取对象 +2. `isSingleton()` 单例对象 +3. `getObjectType()` 返回的是 Bean 对象的类型 + +相比大工厂 BeanFactory 少了特别多东西,没有严格的 Bean 生命周期流程 😄 + +FactoryBean 对象本身也是一个 Bean,是一个小工厂,可以生产另外的 Bean + +BeanFactory 是 Spring 容器的根接口,是大工厂,生产各种各样的 Bean + +beanName 就是正常对象 + +“&”+beanName , 获取的是实现了该接口的 FactoryBean 工厂对象 + +大致如下 👇 + +![image-20211213225330193](../../../images/spring/image-20211213225330193.png) + +### ApplicationContext + +我们再来看看这个 ApplicationContext + +![image-20210904161808341](../../../images/spring/image-20210904161808341.png) + +可以看到它扩展了很多功能,除了 BeanFactory ,它还可以**创建 , 获取 Bean**,以及处理**国际化**,**事件**,**获取资源**等 + +- EnvironmentCapable 获取 环境变量 的功能,可以获取到 **操作系统变量** 和 **JVM 环境变量** +- ListableBeanFactory 获取所有 BeanNames,判断某个 BeanName 是否存在 BeanDefinition 对象,统计 BeanDefinition 对象,获取某个类型对应的所有 beanNames 等功能 +- HierarchicalBeanFactory 获取父 BeanFactory ,判断某个 name 是否存在 bean 对象的功能 +- MessageSource **国际化功能**,获取某个国际化资源 +- ApplicationEventPublisher **事件发布功能**(重点) +- ResourcePatternResolver **加载,获取资源的功能**,这里的资源可能是文件,图片 等某个 URL 资源都可以 + +还有这三个重要的类 👇,就不一一介绍先啦 😄 + +1. ClassPathXmlApplicationContext +2. AnnotationConfigApplicationContext +3. FileSystemXmlApplicationContext + +赶紧来看看这个核心叭! + +### IOC 容器 + +当然,这时候出场的肯定是 IOC 啦。 + +我们都知道 IOC 是 **控制反转** ,但是别忘了 **容器** 这个词,比如 **容器的根接口 **BeanFactory ,**容器的实现** 👇 + +1. ClassPathXmlApplicationContext +2. AnnotationConfigApplicationContext +3. FileSystemXmlApplicationContext + +同时我们要注意这里无处不在的 **后置处理器** xxxPostProcessor 🐷 + +这个是 Spring 中扩展性强的原因了! + +我们可以在各个过程中合理应用这些 PostProcessor 来扩展,或者修改 Bean 定义信息等等 + +![image-20211213225748030](../../../images/spring/image-20211213225748030.png) + +可以看到在这个容器中,完成了 Bean 的初始化,而这个过程还有很多细节 ,请往下看看 👇 + +DI 到时写 **属性填充** 时再介绍 🐷 + +### BeanFactory 后置处理器 + +作为 IOC 容器根接口的 BeanFactory ,有着非常高的扩展性,比如最开始获取原料 BeanDefinition 时,就出现了两个针对 BeanFactory 工厂的后置处理器 👇 + +BeanDefinitionRegistryPostProcessor + +> 通过该接口,我们可以自己掌控我们的 **原料**,通过 BeanDefinitionRegistry 接口去 **新增**,**删除**,**获取**我们这个 BeanDefinition + +BeanFactoryPostProcessor + +> 通过该接口,可以在 **实例化对象前**,对 BeanDefinition 进行修改 ,**冻结** ,**预实例化单例 Bean** 等 + +经过上面层层阻碍后,我们最终会来到目标方法 getBean ,将原料投入生产,最终获取一个个 Bean 对象出来 + +那么随之而来的就是这个 Bean 的生命周期啦 😄 + +### Bean 生命周期 + +Bean 的创建和管理有**标准化的流程**! + +这里在我们的工厂 BeanFactory 中写得很清楚 👇 + +![image-20210902072224002](../../../images/spring/image-20210902072224002.png) + +总共 **14** 个步骤,是不是一下子就清晰多了 😄 + +![image-20211213225831583](../../../images/spring/image-20211213225831583.png) + +在看这部分的源码时,要多注意两个英文单词 😝 + +1. **实例化** 👉 **Instantiation** +2. **初始化** 👉 **Initialization** + +ps: 别看快搞错了 哈哈 😝 + +仔细阅读上面这 14 个步骤,会发现前面 **8** 个都是 Aware 接口,而他们的作用也很简单,就是获取 xxAware 这个单词的前缀 xx 😄 + +比如事件发布器 ApplicationEventPublisher ,只要你实现了 ApplicationEventPublisherAware 接口,就可以**获取** 事件发布器 ApplicationEventPublisher ! + +### Bean 后置处理器 + +在实例化 和 初始化流程中,把这个 Bean 的后置处理器 BeanPostProcessor 安排上,就得到下图啦 👇 + +![image-20211213225953964](../../../images/spring/image-20211213225953964.png) + +这里留意下 **实例化** 有扩展点 InstantiationAwareBeanPostProcessor , **初始化** 扩展点 BeanPostProcessor 就非常多啦,我们主要来关注下这个 AOP + +### AOP + +那么 AOP 是在哪个步骤代理对象的呢?👇 + +![image-20211213230042502](../../../images/spring/image-20211213230042502.png) + +可以在 AbstractAutoProxyCreator 类中看到 👇 + +![image-20210903080803199](../../../images/spring/image-20210903080803199.png) + +### 总结 + +本文就先介绍到这里啦 🐖 + +主要介绍了 Spring 里面的这些脉络,方便小伙伴们对它有个整体的印象先~ + +再介绍其中的一些扩展点,比如从源材料开始的 BeanFactoryPostprocessor ,到产物 Bean 的 BeanPostprocessor 。 + +实例化,初始化的顺序,Bean 的生命周期,以及 BeanFactory 及子类扩展的功能,再到 ApplicationContext 的功能。 + +还有这个核心机制: **工厂+XML+反射**,以及 AOP **发生的地方**。😋 + +![image-20211213230212297](../../../images/spring/image-20211213230212297.png) diff --git a/docs/nacos/nacos-discovery.md b/docs/nacos/nacos-discovery.md index 6e385d8..c05f686 100644 --- a/docs/nacos/nacos-discovery.md +++ b/docs/nacos/nacos-discovery.md @@ -10,7 +10,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.boot.nacos.discovery.autoconfigure.NacosDiscoveryAutoConfiguration ``` -找到注解`NacosDiscoveryAutoConfiguration` +找到类 `NacosDiscoveryAutoConfiguration` ```java @ConditionalOnProperty(name = NacosDiscoveryConstants.ENABLED, matchIfMissing = true) @@ -301,18 +301,18 @@ class BeatTask implements Runnable { try { // 与nacos进行一次rest请求交互 JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); - long interval = result.getIntValue("clientBeatInterval"); + long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong(); boolean lightBeatEnabled = false; - if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) { - lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED); + if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { + lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; - if (result.containsKey(CommonParams.CODE)) { - code = result.getIntValue(CommonParams.CODE); + if (result.has(CommonParams.CODE)) { + code = result.get(CommonParams.CODE).asInt(); } // 如果nacos找不到当前实例, if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { @@ -336,8 +336,12 @@ class BeatTask implements Runnable { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg()); + } catch (Exception unknownEx) { + NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}", + JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx); + } finally { + executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } - executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } ``` @@ -359,47 +363,41 @@ class BeatTask implements Runnable { NacosException exception = new NacosException(); - if (servers != null && !servers.isEmpty()) { - - Random random = new Random(System.currentTimeMillis()); - int index = random.nextInt(servers.size()); - - for (int i = 0; i < servers.size(); i++) { - // 获取nacos所在的ip+port地址 - String server = servers.get(index); - try { - // 进行请求 - return callServer(api, params, body, server, method); - } catch (NacosException e) { - exception = e; - if (NAMING_LOGGER.isDebugEnabled()) { - NAMING_LOGGER.debug("request {} failed.", server, e); - } - } - index = (index + 1) % servers.size(); - } - } - - if (StringUtils.isNotBlank(nacosDomain)) { - for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { - try { - return callServer(api, params, body, nacosDomain, method); - } catch (NacosException e) { - exception = e; - if (NAMING_LOGGER.isDebugEnabled()) { - NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); - } - } - } - } - - NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", - api, servers, exception.getErrCode(), exception.getErrMsg()); - - throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " - + exception.getMessage()); - - } + if (serverListManager.isDomain()) { + String nacosDomain = serverListManager.getNacosDomain(); + for (int i = 0; i < maxRetry; i++) { + try { + return callServer(api, params, body, nacosDomain, method); + } catch (NacosException e) { + exception = e; + if (NAMING_LOGGER.isDebugEnabled()) { + NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); + } + } + } + } else { + Random random = new Random(System.currentTimeMillis()); + int index = random.nextInt(servers.size()); + + for (int i = 0; i < servers.size(); i++) { + String server = servers.get(index); + try { + return callServer(api, params, body, server, method); + } catch (NacosException e) { + exception = e; + if (NAMING_LOGGER.isDebugEnabled()) { + NAMING_LOGGER.debug("request {} failed.", server, e); + } + } + index = (index + 1) % servers.size(); + } + } + + NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), + exception.getErrMsg()); + + throw new NacosException(exception.getErrCode(), + "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); ``` **学习点** @@ -437,19 +435,24 @@ public void registerService(String serviceName, String groupName, Instance insta NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); + String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); + if (instance.isEphemeral()) { + BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); + beatReactor.addBeatInfo(groupedServiceName, beatInfo); + } - final Map params = new HashMap(9); + final Map params = new HashMap(32); params.put(CommonParams.NAMESPACE_ID, namespaceId); - params.put(CommonParams.SERVICE_NAME, serviceName); + params.put(CommonParams.SERVICE_NAME, groupedServiceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); - params.put("ip", instance.getIp()); - params.put("port", String.valueOf(instance.getPort())); - params.put("weight", String.valueOf(instance.getWeight())); - params.put("enable", String.valueOf(instance.isEnabled())); - params.put("healthy", String.valueOf(instance.isHealthy())); - params.put("ephemeral", String.valueOf(instance.isEphemeral())); - params.put("metadata", JSON.toJSONString(instance.getMetadata())); + params.put(IP_PARAM, instance.getIp()); + params.put(PORT_PARAM, String.valueOf(instance.getPort())); + params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight())); + params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled())); + params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy())); + params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral())); + params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); @@ -645,76 +648,41 @@ public Instance getInstance(String namespaceId, String serviceName, String clust ```java @CanDistro @PutMapping("/beat") -@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) -public JSONObject beat(HttpServletRequest request) throws Exception { - - JSONObject result = new JSONObject(); - - result.put("clientBeatInterval", switchDomain.getClientBeatInterval()); - String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); - String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, - Constants.DEFAULT_NAMESPACE_ID); - String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, - UtilsAndCommons.DEFAULT_CLUSTER_NAME); - String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); - int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); - String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); - - RsInfo clientBeat = null; - if (StringUtils.isNotBlank(beat)) { - clientBeat = JSON.parseObject(beat, RsInfo.class); - } - - if (clientBeat != null) { - if (StringUtils.isNotBlank(clientBeat.getCluster())) { - clusterName = clientBeat.getCluster(); - } +@Secured(action = ActionTypes.WRITE) +public ObjectNode beat(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId, + @RequestParam String serviceName, + @RequestParam(defaultValue = StringUtils.EMPTY) String ip, + @RequestParam(defaultValue = UtilsAndCommons.DEFAULT_CLUSTER_NAME) String clusterName, + @RequestParam(defaultValue = "0") Integer port, + @RequestParam(defaultValue = StringUtils.EMPTY) String beat)throws Exception { + + ObjectNode result = JacksonUtils.createEmptyJsonNode(); + result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); + RsInfo clientBeat = null; + if (StringUtils.isNotBlank(beat)) { + clientBeat = JacksonUtils.toObj(beat, RsInfo.class); + } + if (clientBeat != null) { + if (StringUtils.isNotBlank(clientBeat.getCluster())) { + clusterName = clientBeat.getCluster(); + } else { + // fix #2533 + clientBeat.setCluster(clusterName); + } ip = clientBeat.getIp(); port = clientBeat.getPort(); - } - - if (Loggers.SRV_LOG.isDebugEnabled()) { - Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); - } - // 获取实例 - Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); - - if (instance == null) { - if (clientBeat == null) { - result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); - return result; } - instance = new Instance(); - instance.setPort(clientBeat.getPort()); - instance.setIp(clientBeat.getIp()); - instance.setWeight(clientBeat.getWeight()); - instance.setMetadata(clientBeat.getMetadata()); - instance.setClusterName(clusterName); - instance.setServiceName(serviceName); - instance.setInstanceId(instance.getInstanceId()); - instance.setEphemeral(clientBeat.isEphemeral()); - - serviceManager.registerInstance(namespaceId, serviceName, instance); - } - - Service service = serviceManager.getService(namespaceId, serviceName); - - if (service == null) { - throw new NacosException(NacosException.SERVER_ERROR, - "service not found: " + serviceName + "@" + namespaceId); - } - if (clientBeat == null) { - clientBeat = new RsInfo(); - clientBeat.setIp(ip); - clientBeat.setPort(port); - clientBeat.setCluster(clusterName); - } - // 处理心跳方法 - service.processClientBeat(clientBeat); - result.put(CommonParams.CODE, NamingResponseCode.OK); - result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval()); - result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); - return result; + NamingUtils.checkServiceNameFormat(serviceName); + Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat, + serviceName, namespaceId); + BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder(); + int resultCode = instanceServiceV2 + .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder); + result.put(CommonParams.CODE, resultCode); + result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, + instanceServiceV2.getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); + result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); + return result; } ``` diff --git a/docs/rocketmq/rocketmq-commitlog.md b/docs/rocketmq/rocketmq-commitlog.md new file mode 100644 index 0000000..316f0b5 --- /dev/null +++ b/docs/rocketmq/rocketmq-commitlog.md @@ -0,0 +1,272 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ CommitLog 详解 + +commitlog 目录主要存储消息,为了保证性能,顺序写入,每一条消息的长度都不相同,每条消息的前面四个字节存储该条消息的总长度,每个文件大小默认为 1G,文件的命名是以 commitLog 起始偏移量命名的,可以通过修改 broker 配置文件中 mappedFileSizeCommitLog 属性改变文件大小 + +1、获取最小偏移量 + +org.apache.rocketmq.store.CommitLog#getMinOffset + +```java +public long getMinOffset() { + MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); + if (mappedFile != null) { + if (mappedFile.isAvailable()) { + return mappedFile.getFileFromOffset(); + } else { + return this.rollNextFile(mappedFile.getFileFromOffset()); + } + } + + return -1; +} +``` + +获取目录下的第一个文件 + +```java +public MappedFile getFirstMappedFile() { + MappedFile mappedFileFirst = null; + + if (!this.mappedFiles.isEmpty()) { + try { + mappedFileFirst = this.mappedFiles.get(0); + } catch (IndexOutOfBoundsException e) { + //ignore + } catch (Exception e) { + log.error("getFirstMappedFile has exception.", e); + } + } + + return mappedFileFirst; +} +``` + +如果该文件可用返回文件的起始偏移量,否则返回下一个文件的 起始偏移量 + +```java +public long rollNextFile(final long offset) { + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + return offset + mappedFileSize - offset % mappedFileSize; +} +``` + +2、根据偏移量和消息长度查找消息 + +org.apache.rocketmq.store.CommitLog#getMessage + +```java +public SelectMappedBufferResult getMessage(final long offset, final int size) { + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); + if (mappedFile != null) { + int pos = (int) (offset % mappedFileSize); + return mappedFile.selectMappedBuffer(pos, size); + } + return null; +} +``` + +首先获取 commitLog 文件大小,默认 1G + +`private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;` + +获取偏移量所在的 MappedFile + +org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) + +获取第一个 MappedFile 和最后一个 MappedFile,校验偏移量是否在这两个 MappedFile 之间,计算当前偏移量所在 MappedFiles 索引值为当前偏移量的索引减去第一个文件的索引值 + +```java +if (firstMappedFile != null && lastMappedFile != null) { + if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { + LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", + offset, + firstMappedFile.getFileFromOffset(), + lastMappedFile.getFileFromOffset() + this.mappedFileSize, + this.mappedFileSize, + this.mappedFiles.size()); + } else { + int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); + MappedFile targetFile = null; + try { + targetFile = this.mappedFiles.get(index); + } catch (Exception ignored) { + } + + if (targetFile != null && offset >= targetFile.getFileFromOffset() + && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { + return targetFile; + } + + for (MappedFile tmpMappedFile : this.mappedFiles) { + if (offset >= tmpMappedFile.getFileFromOffset() + && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { + return tmpMappedFile; + } + } + } + + if (returnFirstOnNotFound) { + return firstMappedFile; + } +} +``` + +根据在文件内的偏移量和消息长度获取消息内容 + +```java +public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { + int readPosition = getReadPosition(); + if ((pos + size) <= readPosition) { + if (this.hold()) { + ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); + byteBuffer.position(pos); + ByteBuffer byteBufferNew = byteBuffer.slice(); + byteBufferNew.limit(size); + return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); + } else { + log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " + + this.fileFromOffset); + } + } else { + log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size + + ", fileFromOffset: " + this.fileFromOffset); + } + + return null; +} +``` + +3、Broker 正常停止文件恢复 + +org.apache.rocketmq.store.CommitLog#recoverNormally + +首先查询消息是否验证 CRC + +`boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();` + +从倒数第 3 个文件开始恢复,如果不足 3 个文件,则从第一个文件开始恢复 + +```java +int index = mappedFiles.size() - 3; +if (index < 0) + index = 0; +``` + +循环遍历 CommitLog 文件,每次取出一条消息 + +`DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);` + +如果查找结果为 true 并且消息的长度大于 0,表示消息正确,mappedFileOffset 指针向前移动本条消息的长度; + +```java +if (dispatchRequest.isSuccess() && size > 0) { + mappedFileOffset += size; +} +``` + +如果查找结果为 true 并且结果等于 0,表示已到文件 的末尾,如果还有下一个文件,则重置 processOffset、mappedOffset 并重复上述步骤,否则跳出循环; + +```java +else if (dispatchRequest.isSuccess() && size == 0) { + index++; + if (index >= mappedFiles.size()) { + // Current branch can not happen + log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName()); + break; + } else { + mappedFile = mappedFiles.get(index); + byteBuffer = mappedFile.sliceByteBuffer(); + processOffset = mappedFile.getFileFromOffset(); + mappedFileOffset = 0; + log.info("recover next physics file, " + mappedFile.getFileName()); + } +} +``` + +如果查找结果为 false,则表示消息没有填满该文件,跳出循环,结束遍历 + +```java +else if (!dispatchRequest.isSuccess()) { + log.info("recover physics file end, " + mappedFile.getFileName()); + break; +} +``` + +更新 committedPosition 和 flushedWhere 指针 + +```java +this.mappedFileQueue.setFlushedWhere(processOffset); +this.mappedFileQueue.setCommittedWhere(processOffset); +``` + +删除 offset 之后的所有文件。遍历目录下面的所有文件,如果文件尾部偏移量小于 offset 则跳过该文件,如果尾部的偏移量大于 offset,则进一步比较 offset 与文件的开始偏移量,如果 offset 大于文件的开始偏移量,说明当前文件包含了有效偏移量,设置 MappedFile 的 flushPosition 和 commitedPosition。 + +如果 offset 小于文件的开始偏移量,说明该文件是有效文件后面创建的,调用 MappedFile#destroy()方法释放资源 + +```java +if (fileTailOffset > offset) { + if (offset >= file.getFileFromOffset()) { + file.setWrotePosition((int) (offset % this.mappedFileSize)); + file.setCommittedPosition((int) (offset % this.mappedFileSize)); + file.setFlushedPosition((int) (offset % this.mappedFileSize)); + } else { + file.destroy(1000); + willRemoveFiles.add(file); + } +} +``` + +释放资源需要关闭 MappedFile 和文件通道 fileChannel + +```java +public boolean destroy(final long intervalForcibly) { + this.shutdown(intervalForcibly); + + if (this.isCleanupOver()) { + try { + this.fileChannel.close(); + log.info("close file channel " + this.fileName + " OK"); + + long beginTime = System.currentTimeMillis(); + boolean result = this.file.delete(); + log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + + this.getFlushedPosition() + ", " + + UtilAll.computeElapsedTimeMilliseconds(beginTime)); + } catch (Exception e) { + log.warn("close file channel " + this.fileName + " Failed. ", e); + } + + return true; + } else { + log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + + " Failed. cleanupOver: " + this.cleanupOver); + } + + return false; +} +``` + +判断`maxPhyOffsetOfConsumeQueue`是否大于 processOffset,如果大于,需要删除 ConsumeQueue 中 processOffset 之后的数据 + +```java +if (maxPhyOffsetOfConsumeQueue >= processOffset) { + log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); + this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); +} +``` + +```java +public void truncateDirtyLogicFiles(long phyOffset) { + ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; + + for (ConcurrentMap maps : tables.values()) { + for (ConsumeQueue logic : maps.values()) { + logic.truncateDirtyLogicFiles(phyOffset); + } + } +} +``` diff --git a/docs/rocketmq/rocketmq-consume-message-process.md b/docs/rocketmq/rocketmq-consume-message-process.md new file mode 100644 index 0000000..50cf4cd --- /dev/null +++ b/docs/rocketmq/rocketmq-consume-message-process.md @@ -0,0 +1,220 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ 消息消费流程 + +拉取消息 成功之后 会调用 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest 组装 消费消息 请求 + +获取 consumeMessageBatchMaxSize,表示一个 ConsumeRequest 包含的消息 数量,默认为 1 + +入参 msgs 为拉取消息的最大值,默认为 32 + +如果 msgs 小于等于 consumeMessageBatchMaxSize,直接创建`ConsumeRequest`任务并提交到 线程池,当出现`RejectedExecutionException`异常时会重新提交任务,但是查看线程池的队列 + +`this.consumeRequestQueue = new LinkedBlockingQueue();` + +为无界队列,最大值为`Integer.MAX_VALUE`,理论上不会出现该异常 + +```java +if (msgs.size() <= consumeBatchSize) { + ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); + try { + this.consumeExecutor.submit(consumeRequest); + } catch (RejectedExecutionException e) { + this.submitConsumeRequestLater(consumeRequest); + } +} +``` + +如果 msgs 大于 consumeMessageBatchMaxSize,消息分批处理,即创建多个`ConsumeRequest`任务 + +```java +for (int total = 0; total < msgs.size(); ) { + List msgThis = new ArrayList(consumeBatchSize); + for (int i = 0; i < consumeBatchSize; i++, total++) { + if (total < msgs.size()) { + msgThis.add(msgs.get(total)); + } else { + break; + } + } + + ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); + try { + this.consumeExecutor.submit(consumeRequest); + } catch (RejectedExecutionException e) { + for (; total < msgs.size(); total++) { + msgThis.add(msgs.get(total)); + } + + this.submitConsumeRequestLater(consumeRequest); + } +} +``` + +`class ConsumeRequest implements Runnable` + +详细的消费逻辑查看 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run + +第 1 步:首先会校验队列的 dropped 是否为 true,当队列重平衡的时候,该队列可能会被分配给其他消费者,如果该队列被分配给其他消费者,会设置 dropped 为 true + +```java +if (this.processQueue.isDropped()) { + log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); + return; +} +``` + +第 2 步:如果是重试消息重新设置主题 + +```java +public void resetRetryAndNamespace(final List msgs, String consumerGroup) { + final String groupTopic = MixAll.getRetryTopic(consumerGroup); + for (MessageExt msg : msgs) { + String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (retryTopic != null && groupTopic.equals(msg.getTopic())) { + msg.setTopic(retryTopic); + } + + if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { + msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); + } + } +} +``` + +第 3 步:如果有钩子函数则执行 + +```java +if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); + consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); + consumeMessageContext.setProps(new HashMap()); + consumeMessageContext.setMq(messageQueue); + consumeMessageContext.setMsgList(msgs); + consumeMessageContext.setSuccess(false); + ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); +} +``` + +第 4 步:调用消息监听器的`consumeMessage执行具体的消费逻辑` ,返回值为`ConsumeConcurrentlyStatus` + +```java +try { + if (msgs != null && !msgs.isEmpty()) { + for (MessageExt msg : msgs) { + MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); + } + } + status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); +} catch (Throwable e) { + log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, + messageQueue), e); + hasException = true; +} +``` + +```java +public enum ConsumeConcurrentlyStatus { + /** + * Success consumption + */ + CONSUME_SUCCESS, + /** + * Failure consumption,later try to consume + */ + RECONSUME_LATER; +} + +``` + +第 5 步:如果有 钩子 函数执行钩子 + +```java +if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.setStatus(status.toString()); + consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS== status); + ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); +} +``` + +第 6 步:再次校验队列 的 dropped 状态 ,如果为 false 才会对结果进行处理 + +```java +if (!processQueue.isDropped()) { + ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); +} else { + log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); +} +``` + +org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult + +第 7 步:计算 ackIndex,如果为`CONSUME_SUCCESS`等于`consumeRequest.getMsgs().size() - 1;` + +如果为`RECONSUME_LATER`等于-1 + +```java +switch (status) { + caseCONSUME_SUCCESS: + if (ackIndex >= consumeRequest.getMsgs().size()) { + ackIndex = consumeRequest.getMsgs().size() - 1; + } + int ok = ackIndex + 1; + int failed = consumeRequest.getMsgs().size() - ok; + this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); + break; + caseRECONSUME_LATER: + ackIndex = -1; + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), + consumeRequest.getMsgs().size()); + break; + default: + break; +} +``` + +第 8 步:如果是广播模式并且是消费失败,打印警告 信息,如果是集群模式并且消费失败会将消息发送到 broker,如果发送失败将消息封装到 consumerRequest 中延迟消费 + +```java +switch (this.defaultMQPushConsumer.getMessageModel()) { + caseBROADCASTING: + for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { + MessageExt msg = consumeRequest.getMsgs().get(i); + log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); + } + break; + caseCLUSTERING: + List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); + for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { + MessageExt msg = consumeRequest.getMsgs().get(i); + boolean result = this.sendMessageBack(msg, context); + if (!result) { + msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); + msgBackFailed.add(msg); + } + } + + if (!msgBackFailed.isEmpty()) { + consumeRequest.getMsgs().removeAll(msgBackFailed); + + this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); + } + break; + default: + break; +} +``` + +第 9 步:更新消息消费偏移量 + +```java +long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); +if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { + this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); +} +``` diff --git a/docs/rocketmq/rocketmq-consumequeue.md b/docs/rocketmq/rocketmq-consumequeue.md new file mode 100644 index 0000000..40c6c7d --- /dev/null +++ b/docs/rocketmq/rocketmq-consumequeue.md @@ -0,0 +1,199 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ ConsumeQueue 详解 + +RocketMQ 基于主题订阅模式实现消息消费,消费者关注每一个主题下的所有消息,但是同一主题下的消息是不连续地存储在 CommitLog 文件中的,如果消费者直接从消息存储文件中遍历查找主题下的消息,效率会特别低。所以为了在查找消息的时候效率更高一些,设计了 ConsumeQueue 文件,可以看作 CommitLog 消费的目录文件. + +ConsumeQueue 的第一级目录为消息主题名称,第二级目录为主题的队列 id + +为了加速 ConsumeQueue 消息的查询速度并节省磁盘空间,不会存储消息的全量信息,只会 存储一些 关键信息,如 8 字节的 CommmitLog 偏移量、4 字节的文件大小、8 字节的 tag 哈希码 + +1、根据消息存储时间查找物理偏移量: + +org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime + +第一步:根据时间戳定位物理文件 + +```java +public MappedFile getMappedFileByTime(final long timestamp) { + Object[] mfs = this.copyMappedFiles(0); + + if (null == mfs) + return null; + + for (int i = 0; i < mfs.length; i++) { + MappedFile mappedFile = (MappedFile) mfs[i]; + if (mappedFile.getLastModifiedTimestamp() >= timestamp) { + return mappedFile; + } + } + + return (MappedFile) mfs[mfs.length - 1]; +} +``` + +从第一个文件 开始,找到第一个更新时间大于该时间戳的文件 + +第二步:利用二分查找法来加速检索 + +计算最低查找偏移量,如果消息队列偏移量大于文件的偏移量,则最低偏移量等于消息队列偏移量减去文件的偏移量,反之为 0 + +`int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;` + +计算中间偏移量,其中*`CQ_STORE_UNIT_SIZE` =* 8 字节的 CommmitLog 偏移量 + 4 字节的文件大小+8 字节的 tag 哈希码 + +`midOffset = (low + high) / (2 * *CQ_STORE_UNIT_SIZE*) * *CQ_STORE_UNIT_SIZE*;` + +如果得到的物理偏移量小于当前最小物理偏移量,则待查找消息的物理偏移量大于 midOffset,将 low 设置为 midOffset,继续查询 + +```java +byteBuffer.position(midOffset); +long phyOffset = byteBuffer.getLong(); +int size = byteBuffer.getInt(); +if (phyOffset < minPhysicOffset) { + low = midOffset +CQ_STORE_UNIT_SIZE; + leftOffset = midOffset; + continue; +} +``` + +如果得到的物理偏移量大于最小物理偏移量,说明该消息为有效信息,则根据消息物理偏移量和消息长度获取消息存储的时间戳 + +```java +long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); +``` + +如果存储时间小于 0,则为无效消息,返回 0; + +如果存储时间戳等于待查找时间戳,说明查找到了目标消息,设置 targetOffset,跳出循环; + +如果存储时间戳大于待查找时间戳,说明待查找消息的物理偏移量小于 midOffset,设置 high 为 midOffset,设置 rightIndexValue 等于 storeTime,设置 rightOffset 为 midOffset; + +如果存储时间戳小于待查找时间戳,说明待查找消息的物理偏移量大于 midOffset,设置 low 为 midOffset,设置 leftIndexValue 等于 storeTime,设置 leftOffset 为 midOffset + +```java +if (storeTime < 0) { + return 0; +} else if (storeTime == timestamp) { + targetOffset = midOffset; + break; +} else if (storeTime > timestamp) { + high = midOffset -CQ_STORE_UNIT_SIZE; + rightOffset = midOffset; + rightIndexValue = storeTime; +} else { + low = midOffset +CQ_STORE_UNIT_SIZE; + leftOffset = midOffset; + leftIndexValue = storeTime; +} +``` + +如果 targetOffset 不等于-1,表示找到了存储时间戳等于待查找时间戳的消息; + +如果 leftIndexValue 等于-1,返回大于并且最接近待查找消息的时间戳的偏移量 + +如果 rightIndexValue 等于-1,返回小于并且最接近待查找消息的时间戳的偏移量 + +```java +if (targetOffset != -1) { + + offset = targetOffset; +} else { + if (leftIndexValue == -1) { + offset = rightOffset; + } else if (rightIndexValue == -1) { + offset = leftOffset; + } else { + offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset; + } +} +``` + +2、根据当前偏移量获取下一个文件的偏移量 + +org.apache.rocketmq.store.ConsumeQueue#rollNextFile + +```java +public long rollNextFile(final long index) { + int mappedFileSize = this.mappedFileSize; + int totalUnitsInFile = mappedFileSize /CQ_STORE_UNIT_SIZE; + return index + totalUnitsInFile - index % totalUnitsInFile; +} +``` + +3、ConsumeQueue 添加消息 + +org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo + +将消息偏移量、消息长度、tag 哈希码写入 ByteBuffer,将内容追加到 ConsumeQueue 的内存映射文件中。 + +```java +private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, + final long cqOffset) { + + if (offset + size <= this.maxPhysicOffset) { + log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); + return true; + } + + this.byteBufferIndex.flip(); + this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); + this.byteBufferIndex.putLong(offset); + this.byteBufferIndex.putInt(size); + this.byteBufferIndex.putLong(tagsCode); + + final long expectLogicOffset = cqOffset *CQ_STORE_UNIT_SIZE; + + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); + if (mappedFile != null) { + + if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { + this.minLogicOffset = expectLogicOffset; + this.mappedFileQueue.setFlushedWhere(expectLogicOffset); + this.mappedFileQueue.setCommittedWhere(expectLogicOffset); + this.fillPreBlank(mappedFile, expectLogicOffset); + log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + + mappedFile.getWrotePosition()); + } + + if (cqOffset != 0) { + long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + + if (expectLogicOffset < currentLogicOffset) { + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); + return true; + } + + if (expectLogicOffset != currentLogicOffset) { + LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, + currentLogicOffset, + this.topic, + this.queueId, + expectLogicOffset - currentLogicOffset + ); + } + } + this.maxPhysicOffset = offset + size; + return mappedFile.appendMessage(this.byteBufferIndex.array()); + } + return false; +} +``` + +4、ConsumeQueue 文件删除 + +org.apache.rocketmq.store.ConsumeQueue#destroy + +重置 ConsumeQueue 的 maxPhysicOffset 与 minLogicOffset,调用 MappedFileQueue 的 destroy()方法将 ConsumeQueue 目录下的文件全部删除 + +```java +public void destroy() { + this.maxPhysicOffset = -1; + this.minLogicOffset = 0; + this.mappedFileQueue.destroy(); + if (isExtReadEnable()) { + this.consumeQueueExt.destroy(); + } +} +``` diff --git a/docs/rocketmq/rocketmq-consumer-start.md b/docs/rocketmq/rocketmq-consumer-start.md new file mode 100644 index 0000000..78da74a --- /dev/null +++ b/docs/rocketmq/rocketmq-consumer-start.md @@ -0,0 +1,299 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ 消费者启动流程 + +org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start + +`1、检查配置信息` + +org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig + +校验消费组的长度不能大于 255 + +`public static final int CHARACTER_MAX_LENGTH = 255;` + +```java +if (group.length() >CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified group is longer than group max length 255.", null); +} +``` + +消费组名称只能包含数字、字母、%、-、_、| + +```java +// regex: ^[%|a-zA-Z0-9_-]+$ +// % +VALID_CHAR_BIT_MAP['%'] = true; +// - +VALID_CHAR_BIT_MAP['-'] = true; +// _ +VALID_CHAR_BIT_MAP['_'] = true; +// | +VALID_CHAR_BIT_MAP['|'] = true; +for (int i = 0; i = '0' && i <= '9') { + // 0-9 + VALID_CHAR_BIT_MAP[i] = true; + } else if (i >= 'A' && i <= 'Z') { + // A-Z + VALID_CHAR_BIT_MAP[i] = true; + } else if (i >= 'a' && i <= 'z') { + // a-z + VALID_CHAR_BIT_MAP[i] = true; + } +} +``` + +```java +public static boolean isTopicOrGroupIllegal(String str) { + int strLen = str.length(); + int len =VALID_CHAR_BIT_MAP.length; + boolean[] bitMap =VALID_CHAR_BIT_MAP; + for (int i = 0; i < strLen; i++) { + char ch = str.charAt(i); + if (ch >= len || !bitMap[ch]) { + return true; + } + } + return false; +} +``` + +消费组名称不能是`DEFAULT_CONSUMER` + +`public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";` + +```java +if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException("consumerGroup can not equal " + MixAll.DEFAULT_CONSUMER_GROUP ++ ", please specify another one." + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); +} +``` + +消费者最小线程数需要在 1-1000 之间 + +```java +if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 + || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { + throw new MQClientException("consumeThreadMin Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); +} +``` + +消费者最大线程数需要在 1-1000 之间 + +```java +if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { + throw new MQClientException("consumeThreadMax Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); +} +``` + +`2、设置订阅信息` + +构造主题订阅消息`SubscriptionData`并将其加入`RebalanceImpl`,如果是消费模式是集群,订阅默认的重试主题并且构造`SubscriptionData`加入`RebalanceImpl` + +```java +private void copySubscription() throws MQClientException { + try { + Map sub = this.defaultMQPushConsumer.getSubscription(); + if (sub != null) { + for (final Map.Entry entry : sub.entrySet()) { + final String topic = entry.getKey(); + final String subString = entry.getValue(); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + } + } + + if (null == this.messageListenerInner) { + this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); + } + + switch (this.defaultMQPushConsumer.getMessageModel()) { + caseBROADCASTING: + break; + caseCLUSTERING: + final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); + this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); + break; + default: + break; + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } +} +``` + +`3、初始化MqClientInstance、RebalanceImpl、PullApiWrapper` + +创建`MqClientInstance`, 无论在生产者端还是消费者端都是一个很重要的类, 封装了Topic信息、broker信息,当然还有生产者和消费者的信息。 + +```java +public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + String clientId = clientConfig.buildMQClientId(); + MQClientInstance instance = this.factoryTable.get(clientId); + if (null == instance) { + instance = new MQClientInstance(clientConfig.cloneClientConfig(), + this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); + MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); + if (prev != null) { + instance = prev; + log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); + } else { + log.info("Created new MQClientInstance for clientId:[{}]", clientId); + } + } + + return instance; +} +``` + +构造`RebalanceImpl` 用来负载消费者与队列的消费关系 + +```java +this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); +this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); +this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); +this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); +``` + +构造`PullApiWrapper` 消费者拉取消息类 + +```java +this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); +this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); +``` + +`4、设置消息偏移量` + +如果是广播模式消费,消息消费进度存储在消费端,如果是集群模式消费,消息消费进度存储在 broker 端 + +```java +if (this.defaultMQPushConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); +} else { + switch (this.defaultMQPushConsumer.getMessageModel()) { + caseBROADCASTING: + this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + caseCLUSTERING: + this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + default: + break; + } + this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); +} +this.offsetStore.load(); +``` + +`5、是否是顺序消费` + +根据是否是顺序消费构造不同的`ConsumeMessageService` + +```java +if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { + this.consumeOrderly = true; + this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); +} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { + this.consumeOrderly = false; + this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); +} +``` + +区别在于启动的线程任务不同: + +顺序消费线程: + +```java +if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + ConsumeMessageOrderlyService.this.lockMQPeriodically(); + } catch (Throwable e) { + log.error("scheduleAtFixedRate lockMQPeriodically exception", e); + } + } + }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); +} +``` + +正常消费线程: + +```java +this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + cleanExpireMsg(); + } catch (Throwable e) { + log.error("scheduleAtFixedRate cleanExpireMsg exception", e); + } + } + +}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); +``` + +6`、启动MQClientInstance` + +消费者与生产者共用 MQClientInstance + +大部分流程已经在生产者启动流程中讲解,这里主要讲解与生产者不同的部分 + +启动保证消费者偏移量最终一致性的任务 + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.persistAllConsumerOffset(); + } catch (Exception e) { + log.error("ScheduledTask persistAllConsumerOffset exception", e); + } + } +}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); +``` + +启动调整线程池大小任务: + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.adjustThreadPool(); + } catch (Exception e) { + log.error("ScheduledTask adjustThreadPool exception", e); + } + } +}, 1, 1, TimeUnit.MINUTES); +``` + +启动重平衡服务: + +`this.rebalanceService.start();` + +7`、更新订阅主题信息` + +更新主题订阅信息: + +```java +private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map subTable = this.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + } + } +} +``` diff --git a/docs/rocketmq/rocketmq-indexfile.md b/docs/rocketmq/rocketmq-indexfile.md new file mode 100644 index 0000000..45c466b --- /dev/null +++ b/docs/rocketmq/rocketmq-indexfile.md @@ -0,0 +1,168 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ IndexFile 详解 + +首先明确一下 IndexFile 的文件结构 + +Index header + 哈希槽,每个槽下面挂载 index 索引,类似哈希表的结构 + +一个 Index 文件默认包含 500 万个哈希槽,一个哈希槽最多存储 4 个 index,也就是一个 IndexFile 默认最多包含 2000 万个 index + +Index header: + +40byte Index header = 8byte 的 beginTimestamp(IndexFile 对应第一条消息的存储时间) + 8byte 的 endTimestamp (IndexFile 对应最后一条消息的存储时间) + 8byte 的 beginPhyoffset(IndexFile 对应第一条消息在 CommitLog 的物理偏移量) + 8byte 的 endPhyoffset(IndexFile 对应最后一条消息在 CommitLog 的物理偏移量)+ 4byte 的 hashSlotCount(已有 index 的槽个数)+ 4byte 的 indexCount(索引个数) + +哈希槽: + +每个哈希槽占用 4 字节,存储当前槽下面最新的 index 的序号 + +Index: + +20byte 的 index = 4byte 的 keyHash(key 的哈希码) + 8byte 的 phyOffset(消息在文件中的物理偏移量)+ 4byte 的 timeDiff(该索引对应消息的存储时间与当前索引文件第一条消息的存储时间的差值)+ 4byte 的 preIndexNo(该条目的前一个 Index 的索引值) + +1、将消息索引键与消息偏移量的映射关系写入 indexFile + +org.apache.rocketmq.store.index.IndexFile#putKey + +当前已使用的 Index 大于等于允许的最大个数时,返回 false,表示当前 Index 文件已满。 + +如果当前 Index 文件未满,则根据 key 计算出哈希码,然后对槽数量取余定位到某一个哈希槽位置, + +哈希槽的物理偏移量 = IndexHeader 的大小(默认 40Byte) + 哈希槽位置 * 每个哈希槽的大小(4 字节) + +```java +int keyHash = indexKeyHashMethod(key); +int slotPos = keyHash % this.hashSlotNum; +int absSlotPos = IndexHeader.INDEX_HEADER_SIZE+ slotPos * hashSlotSize; +``` + +读取哈希槽中的数据,如果哈希槽中的数据小于 0 或者大于 index 的个数,则为无效索引,将 slotValue 置为 0 + +```java +int slotValue = this.mappedByteBuffer.getInt(absSlotPos); +if (slotValue <=invalidIndex|| slotValue > this.indexHeader.getIndexCount()) { + slotValue =invalidIndex; +} +``` + +计算本次存储消息的时间戳与 indexFile 第一条消息存储时间戳的差值并转换为秒 + +```java +long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); + +timeDiff = timeDiff / 1000; + +if (this.indexHeader.getBeginTimestamp() <= 0) { + timeDiff = 0; +} else if (timeDiff > Integer.MAX_VALUE) { + timeDiff = Integer.MAX_VALUE; +} else if (timeDiff < 0) { + timeDiff = 0; +} +``` + +新添加的消息 index 的物理偏移量 = IndexHeader 大小(40Byte) + Index 文件哈希槽的数量 * 哈希槽的大小(4Byte ) + Index 文件索引数量 * 索引大小(20Byte) + +将消息哈希码、消息物理偏移量、消息存储时间戳与 Index 文件第一条消息的时间戳的差值、当前哈希槽的值、当前 Indexfile 的索引个数存入 mappedByteBuffer + +```java +int absIndexPos = IndexHeader.INDEX_HEADER_SIZE+ this.hashSlotNum *hashSlotSize ++ this.indexHeader.getIndexCount() *indexSize; + +this.mappedByteBuffer.putInt(absIndexPos, keyHash); +this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); +this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); +this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); + +this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); +``` + +更新 IndexHeader 信息: + +如果该 IndexFile 哈希槽中消息的数量小于等于 1,更新 IndexHeader 的 beginPhyOffset 和 beginTimesttamp + +每次添加消息之后更新 IndexCount、endPhyOffset、endTimestamp + +```java +if (this.indexHeader.getIndexCount() <= 1) { + this.indexHeader.setBeginPhyOffset(phyOffset); + this.indexHeader.setBeginTimestamp(storeTimestamp); +} + +if (invalidIndex== slotValue) { + this.indexHeader.incHashSlotCount(); +} +this.indexHeader.incIndexCount(); +this.indexHeader.setEndPhyOffset(phyOffset); +this.indexHeader.setEndTimestamp(storeTimestamp); +``` + +2、根据 key 查找消息 + +org.apache.rocketmq.store.index.IndexFile#selectPhyOffset + +参数如下: + +`List phyOffsets`: 查询到的物理偏移量 + +`String key: 索引key` + +`int maxNum`:本次查找的最大消息条数 + +`long begin`:开始时间戳 + +long end: 结束时间戳 + +根据 key 计算哈希码,哈希码与哈希槽的数量取余得到哈希槽的索引 + +哈希槽的物理地址 = IndexHeader(40byte) + 哈希槽索引 * 每个哈希槽的大小(4byte) + +```java +int keyHash = indexKeyHashMethod(key); +int slotPos = keyHash % this.hashSlotNum; +int absSlotPos = IndexHeader.INDEX_HEADER_SIZE+ slotPos * hashSlotSize; +``` + +`从mappedByteBuffer`获取哈希槽的值,如果值小于等于 0 或者值大于 IndexCount + +或者 IndexCount 的 值小于等于 1 则表示没有有效的结果数据 + +如果查询返回的结果数量大于等于要查询的最大消息条数,终止循环 + +```java +if (slotValue <=invalidIndex|| slotValue > this.indexHeader.getIndexCount() + || this.indexHeader.getIndexCount() <= 1) { +} else { + for (int nextIndexToRead = slotValue; ; ) { + if (phyOffsets.size() >= maxNum) { + break; + } +``` + +如果存储的时间戳小于 0,结束查找,如果哈希码匹配并且存储时间在要查找的开始时间戳和结束时间戳之间,将结果偏移量加入返回结果中 + +```java +if (timeDiff < 0) { + break; +} + +timeDiff *= 1000L; + +long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; +boolean timeMatched = (timeRead >= begin) && (timeRead <= end); + +if (keyHash == keyHashRead && timeMatched) { + phyOffsets.add(phyOffsetRead); +} +``` + +校验该 index 的上一个 index,如果上一个 index 的索引大于 0 并且小于等于 indexCount,时间戳大于等于要查找的开始时间戳,则继续查找 + +```java +if (prevIndexRead <=invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() + || prevIndexRead == nextIndexToRead || timeRead < begin) { + break; +} + +nextIndexToRead = prevIndexRead; +``` diff --git a/docs/rocketmq/rocketmq-mappedfile-detail.md b/docs/rocketmq/rocketmq-mappedfile-detail.md new file mode 100644 index 0000000..0354e96 --- /dev/null +++ b/docs/rocketmq/rocketmq-mappedfile-detail.md @@ -0,0 +1,254 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ MappedFile 内存映射文件详解 + +1、MappedFile 初始化 + +```java +private void init(final String fileName, final int fileSize) throws IOException { + this.fileName = fileName; + this.fileSize = fileSize; + this.file = new File(fileName); + this.fileFromOffset = Long.parseLong(this.file.getName()); + boolean ok = false; + + ensureDirOK(this.file.getParent()); + + try { + this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); + this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); + TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); + TOTAL_MAPPED_FILES.incrementAndGet(); + ok = true; + } catch (FileNotFoundException e) { + log.error("Failed to create file " + this.fileName, e); + throw e; + } catch (IOException e) { + log.error("Failed to map file " + this.fileName, e); + throw e; + } finally { + if (!ok && this.fileChannel != null) { + this.fileChannel.close(); + } + } +} +``` + +初始化`fileFromOffset`,因为 commitLog 文件夹下的文件都是以偏移量为命名的,所以转成了 long 类型 + +确认文件目录是否存在,不存在则创建 + +```java +public static void ensureDirOK(final String dirName) { + if (dirName != null) { + if (dirName.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + String[] dirs = dirName.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String dir : dirs) { + createDirIfNotExist(dir); + } + } else { + createDirIfNotExist(dirName); + } + } +} +``` + +通过`RandomAccessFile`设置 fileChannel + +`this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();` + +使用 NIO 内存映射将文件映射到内存中 + +`this.mappedByteBuffer = this.fileChannel.map(MapMode.*READ_WRITE*, 0, fileSize);` + +2、MappedFile 提交 + +```java +public int commit(final int commitLeastPages) { + if (writeBuffer == null) { + //no need to commit data to file channel, so just regard wrotePosition as committedPosition. + return this.wrotePosition.get(); + } + if (this.isAbleToCommit(commitLeastPages)) { + if (this.hold()) { + commit0(); + this.release(); + } else { + log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); + } + } + + // All dirty data has been committed to FileChannel. + if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { + this.transientStorePool.returnBuffer(writeBuffer); + this.writeBuffer = null; + } + + return this.committedPosition.get(); +} +``` + +如果 wroteBuffer 为空,直接返回 wrotePosition + +```java +if (writeBuffer == null) { + //no need to commit data to file channel, so just regard wrotePosition as committedPosition. + return this.wrotePosition.get(); +} +``` + +判断是否执行 commit 操作: + +如果文件已满,返回 true + +```java +if (this.isFull()) { + return true; +} +``` + +```java +public boolean isFull() { + return this.fileSize == this.wrotePosition.get(); +} +``` + +commitLeastPages 为本次提交的最小页数,如果 commitLeastPages 大于 0,计算当前写指针(`wrotePosition`)与上一次提交的指针`committedPosition`的差值 除以页*`OS_PAGE_SIZE`*的大小得到脏页数量,如果大于 commitLeastPages,就可以提交。如果 commitLeastPages 小于 0,则存在脏页就提交 + +```java +if (commitLeastPages > 0) { + return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= commitLeastPages; +} + +return write > flush; +``` + +MapperFile 具体的提交过程,首先创建 `writeBuffer`的共享缓存区,设置 position 为上一次提交的位置`committedPosition` ,设置 limit 为`wrotePosition`当前写指针,接着将 committedPosition 到 wrotePosition 的数据写入到 FileChannel 中,最后更新 committedPosition 指针为 wrotePosition + +```java +protected void commit0() { + int writePos = this.wrotePosition.get(); + int lastCommittedPosition = this.committedPosition.get(); + + if (writePos - lastCommittedPosition > 0) { + try { + ByteBuffer byteBuffer = writeBuffer.slice(); + byteBuffer.position(lastCommittedPosition); + byteBuffer.limit(writePos); + this.fileChannel.position(lastCommittedPosition); + this.fileChannel.write(byteBuffer); + this.committedPosition.set(writePos); + } catch (Throwable e) { + log.error("Error occurred when commit data to FileChannel.", e); + } + } +} +``` + +3、MappedFile 刷盘 + +判断是否要进行刷盘 + +文件是否已满 + +```java +if (this.isFull()) { + return true; +} +``` + +```java +public boolean isFull() { + return this.fileSize == this.wrotePosition.get(); +} +``` + +如果`flushLeastPages`大于 0,判断写数据指针位置-上次刷盘的指针位置, 然后除以*`OS_PAGE_SIZE 是否大于等于`*`flushLeastPages` + +如果 flushLeastPages 小于等于 0,判断是否有要刷盘的数据 + +```java +if (flushLeastPages > 0) { + return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= flushLeastPages; +} + +return write > flush; +``` + +获取最大读指针 + +```java +public int getReadPosition() { + return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); +} +``` + +将数据刷出到磁盘 + +如果`writeBuffer`不为空或者通道的 position 不等于 0,通过 fileChannel 将数据刷新到磁盘 + +否则通过 MappedByteBuffer 将数据刷新到磁盘 + +4、MappedFile 销毁 + +```java +public boolean destroy(final long intervalForcibly) { + this.shutdown(intervalForcibly); + + if (this.isCleanupOver()) { + try { + this.fileChannel.close(); + log.info("close file channel " + this.fileName + " OK"); + + long beginTime = System.currentTimeMillis(); + boolean result = this.file.delete(); + log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + + this.getFlushedPosition() + ", " + + UtilAll.computeElapsedTimeMilliseconds(beginTime)); + } catch (Exception e) { + log.warn("close file channel " + this.fileName + " Failed. ", e); + } + + return true; + } else { + log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + + " Failed. cleanupOver: " + this.cleanupOver); + } + + return false; +} +``` + +1> 关闭 MappedFile + +第一次调用时 this.`available为true`,设置 available 为 false,设置第一次关闭的时间戳为当前时间戳,调用 release()释放资源,只有在引用次数小于 1 的时候才会释放资源,如果引用次数大于 0,判断当前时间与 firstShutdownTimestamp 的差值是否大于最大拒绝存活期`intervalForcibly`,如果大于等于最大拒绝存活期,将引用数减少 1000,直到引用数小于 0 释放资源 + +```java +public void shutdown(final long intervalForcibly) { + if (this.available) { + this.available = false; + this.firstShutdownTimestamp = System.currentTimeMillis(); + this.release(); + } else if (this.getRefCount() > 0) { + if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { + this.refCount.set(-1000 - this.getRefCount()); + this.release(); + } + } +} +``` + +2> 判断是否清理完成 + +是否清理完成的标准是引用次数小于等于 0 并且清理完成标记 cleanupOver 为 true + +```java +public boolean isCleanupOver() { + return this.refCount.get() <= 0 && this.cleanupOver; +} +``` + +3> 关闭文件通道 fileChannel + +`this.fileChannel.close();` diff --git a/docs/rocketmq/rocketmq-nameserver-broker.md b/docs/rocketmq/rocketmq-nameserver-broker.md new file mode 100644 index 0000000..45993f0 --- /dev/null +++ b/docs/rocketmq/rocketmq-nameserver-broker.md @@ -0,0 +1,220 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RockerMQ Nameserver 如何与 Broker 进行通信的? + +nameserver 每隔 10s 扫描一次 Broker,移除处于未激活状态的 Broker + +核心代码: + +`this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.*SECONDS*);` + +```java +public int scanNotActiveBroker() { + int removeCount = 0; + Iterator> it = this.brokerLiveTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + long last = next.getValue().getLastUpdateTimestamp(); + if ((last +BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { + RemotingUtil.closeChannel(next.getValue().getChannel()); + it.remove(); + log.warn("The broker channel expired, {} {}ms", next.getKey(),BROKER_CHANNEL_EXPIRED_TIME); + this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); + + removeCount++; + } + } + + return removeCount; +} +``` + +broker 每隔 30 秒会向集群中所有的 NameServer 发送心跳包 + +核心代码: + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); + } catch (Throwable e) { + log.error("registerBrokerAll Exception", e); + } + } +}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); +``` + +```java +public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { + TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); + + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + ConcurrentHashMap topicConfigTable = new ConcurrentHashMap(); + for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { + TopicConfig tmp = + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + topicConfigTable.put(topicConfig.getTopicName(), tmp); + } + topicConfigWrapper.setTopicConfigTable(topicConfigTable); + } + + if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.brokerConfig.getRegisterBrokerTimeoutMills())) { + doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); + } +} +``` + +org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 是网络处理器解析请求类型,如果请求类型为`*RequestCode.REGISTER_BROKER`,则请求最终转发到 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker\* + +代码太多,文字来描述一下: + +第一步:路由注册需要加写锁,防止并发修改 RouteInfoManager 中的路由表。首先判断 Broker 所属集群是否存在,如果不存在,则创建集群,然后将 broker 名加入集群。 + +第二步:维护 BrokerData 信息,首先从 brokerAddrTable 中根据 broker 名尝试获取 Broker 信息,如果不存在,则新建 BrokerData 放入 brokerAddrTable,registerFirst 设置为 true;如果存在,直接替换原先的 Broker 信息,registerFirst 设置为 false,表示非第一次注册 + +第三步:如果 Broker 为主节点,并且 Broker 的 topic 配置信息发生变化或者是初次注册,则需要创建或者更新 topic 的路由元数据,并填充 topicQueueTable + +根据 topicConfig 创建 QueueData 数据结构然后更新 topicQueueTable + +```java +private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { + QueueData queueData = new QueueData(); + queueData.setBrokerName(brokerName); + queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); + queueData.setReadQueueNums(topicConfig.getReadQueueNums()); + queueData.setPerm(topicConfig.getPerm()); + queueData.setTopicSysFlag(topicConfig.getTopicSysFlag()); + + Map queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName()); + if (null == queueDataMap) { + queueDataMap = new HashMap<>(); + queueDataMap.put(queueData.getBrokerName(), queueData); + this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap); + log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); + } else { + QueueData old = queueDataMap.put(queueData.getBrokerName(), queueData); + if (old != null && !old.equals(queueData)) { + log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), old, + queueData); + } + } +} +``` + +第四步:更新 BrokerLiveInfo,存储状态正常的 Broker 信息表,BrokerLiveInfo 是执行路由删除操作的重要依据。 + +```java +BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, + new BrokerLiveInfo( + System.currentTimeMillis(), + topicConfigWrapper.getDataVersion(), + channel, + haServerAddr)); +``` + +第五步:注册 Broker 的过滤器 Server 地址列表,一个 Broker 上会关联多个 FilterServer 消息过滤服务器。如果此 Broker 为从节点,则需要查找该 Broker 的主节点信息,并更新对应的 masterAddr 属性 + +```java +if (MixAll.MASTER_ID!= brokerId) { + String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); + if (brokerLiveInfo != null) { + result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); + result.setMasterAddr(masterAddr); + } + } +} +``` + +总结: + +NameServer 与 Broker 保持长连接,Broker 的状态信息存储在 BrokerLiveTable 中,NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 broker 的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表使用了锁粒度较少的读写锁,允许多个消息发送者并发读操作,保证消息发送时的高并发,同一时刻 NameServer 只处理一个 Broker 心跳包,多个心跳包请求串行执行。 + +NameServer 如何剔除失效的 Broker? + +1、NameServer 每隔十秒注册一次 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp + +时间戳距当前时间超过 120 秒,则认为 Broker 失效,移除该 Broker,关闭与 broker 的连接,同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。 + +2、如果 broker 在正常关闭的情况下,会发送 unRegisterBroker 指令。 + +不管是哪一种方式触发的路由删除,处理逻辑是一样的 + +第一步:申请写锁,移除 brokerLiveTable、filterServerTable 中 Broker 相关的信息 + +```java +this.lock.writeLock().lockInterruptibly(); +BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); +log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr); +this.filterServerTable.remove(brokerAddr); +``` + +第二步:维护 brokerAddrTable,找到具体的 broker,将其从 brokerData 中移除,如果移除之后不再包含其他 broker,则在 brokerAddrtable 移除该 brokerName 对应的数据 + +``` +BrokerData brokerData = this.brokerAddrTable.get(brokerName); +if (null != brokerData) { + String addr = brokerData.getBrokerAddrs().remove(brokerId); + log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddr); + + if (brokerData.getBrokerAddrs().isEmpty()) { + this.brokerAddrTable.remove(brokerName); + log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName); + removeBrokerName = true; + } +} +``` + +第三步:根据 brokerName 从 clusterAddrTable 中找到 Broker 并将其中集群中移除,如果移除后集群中不包含任何 Broker,则将该集群从 clusterAddrTable 中移除 + +``` +if (removeBrokerName) { + Set nameSet = this.clusterAddrTable.get(clusterName); + if (nameSet != null) { + boolean removed = nameSet.remove(brokerName); + log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName); + + if (nameSet.isEmpty()) { + this.clusterAddrTable.remove(clusterName); + log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName); + } + } + +} +``` + +第四步: 根据 brokerName,遍历所有主题的队列,如果队列中包含当前 broker 的队列,则移除,如果 topic 中包含待移除的 Broker 的队列,从路由表中删除该 topic + +``` +this.topicQueueTable.forEach((topic, queueDataMap) -> { + QueueData old = queueDataMap.remove(brokerName); + if (old != null) { + log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old); + } + + if (queueDataMap.size() == 0) { + noBrokerRegisterTopic.add(topic); + log.info("removeTopicByBrokerName, remove the topic all queue {}", topic); + } +}); +``` + +第五步: + +释放锁,完成路由删除 + +``` + finally { + this.lock.writeLock().unlock(); +} +``` diff --git a/docs/rocketmq/rocketmq-producer-start.md b/docs/rocketmq/rocketmq-producer-start.md new file mode 100644 index 0000000..804f961 --- /dev/null +++ b/docs/rocketmq/rocketmq-producer-start.md @@ -0,0 +1,238 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ 生产者启动流程 + +入口: + +org.apache.rocketmq.client.producer.DefaultMQProducer#start + +```java +@Override + public void start() throws MQClientException { + this.setProducerGroup(withNamespace(this.producerGroup)); + this.defaultMQProducerImpl.start(); + if (null != traceDispatcher) { + try { + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); + } catch (MQClientException e) { + log.warn("trace dispatcher start failed ", e); + } + } + } +``` + +第一步、检查 producerGroup + +```java +private void checkConfig() throws MQClientException { + Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); + + if (null == this.defaultMQProducer.getProducerGroup()) { + throw new MQClientException("producerGroup is null", null); + } + + if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { + throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null); + } + } +``` + +第二步、设置 instanceName + +```java +public void changeInstanceNameToPID() { + if (this.instanceName.equals("DEFAULT")) { + this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); + } +} +``` + +第三步、创建 mqClientInstance,它是与 nameserver 和 broker 通信的中介 + +```java +public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + String clientId = clientConfig.buildMQClientId(); + MQClientInstance instance = this.factoryTable.get(clientId); + if (null == instance) { + instance = + new MQClientInstance(clientConfig.cloneClientConfig(), + this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); + MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); + if (prev != null) { + instance = prev; + log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); + } else { + log.info("Created new MQClientInstance for clientId:[{}]", clientId); + } + } + + return instance; + } +``` + +第四步、将生产者加入 mqClientInstance 管理 + +```java +public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { + if (null == group || null == producer) { + return false; + } + + MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); + if (prev != null) { + log.warn("the producer group[{}] exist already.", group); + return false; + } + + return true; +} +``` + +第五步、启动 MQClientInstance(有一些关于消费者的任务 会在消费者启动流程中讲解) + +1. 启动 netty 客户端 ,创建与 nameserver、broker 通信的 channel + +```java +public void start() { + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( + nettyClientConfig.getClientWorkerThreads(), + new ThreadFactory() { + + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); + } + }); + + Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + if (nettyClientConfig.isUseTLS()) { + if (null != sslContext) { + pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); + log.info("Prepend SSL handler"); + } else { + log.warn("Connections are insecure as SSLContext is null!"); + } + } + pipeline.addLast( + defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), + new NettyConnectManageHandler(), + new NettyClientHandler()); + } + }); + if (nettyClientConfig.getClientSocketSndBufSize() > 0) { + log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize()); + handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()); + } + if (nettyClientConfig.getClientSocketRcvBufSize() > 0) { + log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize()); + handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); + } + if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) { + log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()); + handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( + nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark())); + } + + this.timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + NettyRemotingClient.this.scanResponseTable(); + } catch (Throwable e) { + log.error("scanResponseTable exception", e); + } + } + }, 1000 * 3, 1000); + + if (this.channelEventListener != null) { + this.nettyEventExecutor.start(); + } +} +``` + +2. 启动一些周期性的任务: + +更新 nameserver 地址的任务: + +```java +if (null == this.clientConfig.getNamesrvAddr()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); + } catch (Exception e) { + log.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); +} +``` + +更新 topic 路由信息的任务: + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.updateTopicRouteInfoFromNameServer(); + } catch (Exception e) { + log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); + } + } +}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); +``` + +更新 broker 的任务: + +```java +this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + MQClientInstance.this.cleanOfflineBroker(); + MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); + } catch (Exception e) { + log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); + } + } +}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); +``` + +启动拉取消息线程: + +`this.pullMessageService.start();` + +```java +public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + PullRequest pullRequest = this.pullRequestQueue.take(); + this.pullMessage(pullRequest); + } catch (InterruptedException ignored) { + } catch (Exception e) { + log.error("Pull Message Service Run Method exception", e); + } + } + + log.info(this.getServiceName() + " service end"); +} +``` diff --git a/docs/rocketmq/rocketmq-pullmessage-processor.md b/docs/rocketmq/rocketmq-pullmessage-processor.md new file mode 100644 index 0000000..f6f2e00 --- /dev/null +++ b/docs/rocketmq/rocketmq-pullmessage-processor.md @@ -0,0 +1,116 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ broker 处理拉取消息请求流程 + +org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand) + +第 1 步、`校验broker是否可读` + +```java +if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); + return response; +} +``` + +第 2 步、`根据消费组获取订阅信息` + +```java +SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); +``` + +第 3 步、`校验是否允许消费` + +```java +if (!subscriptionGroupConfig.isConsumeEnable()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); + return response; +} +``` + +第 4 步、`根据主题获取对应的配置信息` + +```java +TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); +if (null == topicConfig) { + log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); + return response; +} +``` + +第 5 步、`校验主题对应的队列` + +```java +if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { + String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); + log.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + return response; +} +``` + +第 6 步、`如果配置了消息过滤表达式,根据表达式进行构建consumerFilterData,如果没有,则根据主题构建` + +```java +consumerFilterData = ConsumerFilterManager.build( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), + requestHeader.getExpressionType(), requestHeader.getSubVersion() + +consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), + requestHeader.getConsumerGroup()); +``` + +第 7 步、`校验如果不是Tag过滤,是否开启了自定义属性过滤,如果没有开启,不允许操作 只有使用push推送模式的消费者才能用使用SQL92标准的sql语句,pull拉取模式的消费者是不支持这个功能的。` + +```java +if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) + && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); + return response; +} +``` + +第 8 步、`根据是否支持重试过滤创建不同的MessageFilter` + +```java +if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { + messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, + this.brokerController.getConsumerFilterManager()); +} else { + messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, + this.brokerController.getConsumerFilterManager()); +} +``` + +第 9 步、`根据消费组、主题、队列、偏移量、最大拉取消息数量、消息过滤器查找信息` + +```java +final GetMessageResult getMessageResult = + this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); + +``` + +第 10 步、`消息为空 设置code为系统错误 返回response` + +```java +response.setCode(ResponseCode.SYSTEM_ERROR); +response.setRemark("store getMessage return null"); +``` + +第 11 步、`提交偏移量` + +```java +if (storeOffsetEnable) { + this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); +} +``` diff --git a/docs/rocketmq/rocketmq-pullmessage.md b/docs/rocketmq/rocketmq-pullmessage.md new file mode 100644 index 0000000..55f7f5b --- /dev/null +++ b/docs/rocketmq/rocketmq-pullmessage.md @@ -0,0 +1,327 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ 消息拉取流程 + +之前在消费者启动流程中描述过 MQClientInstance 的启动流程,在启动过程中会启动 PullMessageService,它继承了`ServiceThread`,并且 ServiceThread 实现了 Runnable 接口,所以是单独启动了一个线程 + +`public class PullMessageService extends ServiceThread` + +`public abstract class ServiceThread implements Runnable` + +PullMessageService 的 run 方法如下: + +`protected volatile boolean stopped = false;` + +```java +public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + PullRequest pullRequest = this.pullRequestQueue.take(); + this.pullMessage(pullRequest); + } catch (InterruptedException ignored) { + } catch (Exception e) { + log.error("Pull Message Service Run Method exception", e); + } + } + + log.info(this.getServiceName() + " service end"); +} +``` + +只要没有停止,线程一直会从 PullRequestQueue 中获取 PullRequest 消息拉取任务,如果队列为空,会一直阻塞,直到有 PullRequest 被放入队列中,如果拿到了 PullRequest 就会调用 pullMessage 方法拉取消息 + +添加 PullRequest 有两个方法,一个是延迟添加,另一个是立即添加 + +```java +public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { + if (!isStopped()) { + this.scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + PullMessageService.this.executePullRequestImmediately(pullRequest); + } + }, timeDelay, TimeUnit.MILLISECONDS); + } else { + log.warn("PullMessageServiceScheduledThread has shutdown"); + } +} + +public void executePullRequestImmediately(final PullRequest pullRequest) { + try { + this.pullRequestQueue.put(pullRequest); + } catch (InterruptedException e) { + log.error("executePullRequestImmediately pullRequestQueue.put", e); + } +} +``` + +org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage + +拉取消息流程: + +根据消费组获取`MQConsumerInner`,根据推模式还是拉模式,强转为`DefaultMQPushConsumerImpl`还是`DefaultLitePullConsumerImpl` + +org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage + +`第1步:获取处理队列`,`如果队列被丢弃结束` + +```java +final ProcessQueue processQueue = pullRequest.getProcessQueue(); + +if (processQueue.isDropped()) { + log.info("the pull request[{}] is dropped.", pullRequest.toString()); + return; +} +``` + +第 2 步:`设置最后一次拉取时间戳` + +`pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());` + +第 3 步:`确认消费者是启动的状态,如果不是启动的状态,将PullRequest延迟3s放入队列` + +```java +try { + this.makeSureStateOK(); +} catch (MQClientException e) { + log.warn("pullMessage exception, consumer state not ok", e); + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + return; +} +``` + +第 4 步:`如果消费者停止了,将PullRequest延迟1s放入队列` + +```java +if (this.isPause()) { + log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); + this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); + return; +} +``` + +第 5 步:`缓存的消息数量大于1000,将PullRequest延迟50ms放入队列,每触发1000次流控输出警告信息` + +```java +if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { + this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); + } + return; +} +``` + +第 6 步:`缓存的消息大小大于100M 将PullRequest延迟50ms放入队列,每触发1000次输出警告信息` + +```java +if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { + this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); + } + return; +} +``` + +第 7 步:`ProcessQueue中消息的最大偏移量与最小偏移量的差值不能大于2000,如果大于2000,触发流控,输出警告信息` + +```java +if (!this.consumeOrderly) { + if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { + this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { + log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), + pullRequest, queueMaxSpanFlowControlTimes); + } + return; + } +} +``` + +第 8 步:`如果ProcessQueue被锁了,判断上一个PullRequest是否被锁,如果没有被锁通过RebalanceImpl计算拉取消息偏移量,如果计算异常,将请求延迟3s加入队列`,`如果下一次拉取消息 的偏移量大于计算出来的偏移量,说明要拉取的偏移量 大于消费偏移量,对 偏移量 进行修正,设置下一次拉取的偏移量为计算出来的偏移量` + +```java +if (processQueue.isLocked()) { + if (!pullRequest.isPreviouslyLocked()) { + long offset = -1L; + try { + offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); + } catch (Exception e) { + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); + return; + } + boolean brokerBusy = offset < pullRequest.getNextOffset(); + log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", + pullRequest, offset, brokerBusy); + if (brokerBusy) { + log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", + pullRequest, offset); + } + + pullRequest.setPreviouslyLocked(true); + pullRequest.setNextOffset(offset); + } +} else { + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + log.info("pull message later because not locked in broker, {}", pullRequest); + return; +} +``` + +第 9 步:`根据主题名称获取订阅信息,如果为空,将请求延迟3s放入队列` + +```java +final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); +if (null == subscriptionData) { + this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + log.warn("find the consumer's subscription failed, {}", pullRequest); + return; +} +``` + +第 10 步:`创建PullCallback,为后面调用 拉取消息api做准备` + +```java +PullCallback pullCallback = new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + if (pullResult != null) { + pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, + subscriptionData); + + switch (pullResult.getPullStatus()) { + caseFOUND: + long prevRequestOffset = pullRequest.getNextOffset(); + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + long pullRT = System.currentTimeMillis() - beginTimestamp; + DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), pullRT); + + long firstMsgOffset = Long.MAX_VALUE; + if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + } else { + firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); + + DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); + + boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); + DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( + pullResult.getMsgFoundList(), + processQueue, + pullRequest.getMessageQueue(), + dispatchToConsume); + + if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, + DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); + } else { + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + } + } + + if (pullResult.getNextBeginOffset() < prevRequestOffset + || firstMsgOffset < prevRequestOffset) { + log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", + pullResult.getNextBeginOffset(), + firstMsgOffset, + prevRequestOffset); + } + + break; + caseNO_NEW_MSG: + caseNO_MATCHED_MSG: + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); + + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + break; + caseOFFSET_ILLEGAL: + log.warn("the pull request offset illegal, {} {}", + pullRequest.toString(), pullResult.toString()); + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + pullRequest.getProcessQueue().setDropped(true); + DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { + + @Override + public void run() { + try { + DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), + pullRequest.getNextOffset(), false); + + DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); + + DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); + + log.warn("fix the pull request offset, {}", pullRequest); + } catch (Throwable e) { + log.error("executeTaskLater Exception", e); + } + } + }, 10000); + break; + default: + break; + } + } + } + + @Override + public void onException(Throwable e) { + if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + log.warn("execute the pull request exception", e); + } + + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); + } +}; +``` + +第 11 步:`设置系统标记` + +`FLAG_COMMIT_OFFSET: 消费进度 大于0` + +`FLAG_SUSPEND: 拉取消息时支持线程挂起` + +`FLAG_SUBSCRIPTION: 消息过滤机制表达式` + +`FLAG_CLASS_FILTER: 消息过滤机制是否为类过滤` + +```java +int sysFlag = PullSysFlag.buildSysFlag( + commitOffsetEnable, // commitOffset + true, // suspend + subExpression != null, // subscription + classFilter // class filter +); +``` + +第 12 步:`调用 broker 拉取消息` + +```java +// 每一个参数的含义如下 +this.pullAPIWrapper.pullKernelImpl( + pullRequest.getMessageQueue(), // 要拉取的消息队列 + subExpression, // 消息过滤表达式 + subscriptionData.getExpressionType(), // 过滤表达式类型 + subscriptionData.getSubVersion(), // 时间戳 + pullRequest.getNextOffset(), // 消息拉取的开始偏移量 + this.defaultMQPushConsumer.getPullBatchSize(), // 拉取消息的数量 默认32条 + sysFlag, // 系统标记 + commitOffsetValue, // 消费的偏移量 + BROKER_SUSPEND_MAX_TIME_MILLIS, // 允许broker挂起的时间 默认15s + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 允许的超时时间 默认30s + CommunicationMode.ASYNC, // 默认为异步拉取 + pullCallback // 拉取消息之后的回调 +); +``` diff --git a/docs/rocketmq/rocketmq-send-message.md b/docs/rocketmq/rocketmq-send-message.md new file mode 100644 index 0000000..a97b890 --- /dev/null +++ b/docs/rocketmq/rocketmq-send-message.md @@ -0,0 +1,365 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ 消息发送流程 + +这里以同步发送为示例讲解: + +入口: + +org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) + +消息发送 默认超时时间 3 秒 + +第一步:验证 + +主题的长度不能大于 127,消息的大小不能大于 4M + +```java +public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { + if (null == msg) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); + } + // topic + Validators.checkTopic(msg.getTopic()); + Validators.isNotAllowedSendTopic(msg.getTopic()); + + // body + if (null == msg.getBody()) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); + } + + if (0 == msg.getBody().length) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); + } + + if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, + "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); + } +} +``` + +第二步:查找路由信息 + +如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常 + +```java +private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { + TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + topicPublishInfo = this.topicPublishInfoTable.get(topic); + } + + if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { + return topicPublishInfo; + } else { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); + topicPublishInfo = this.topicPublishInfoTable.get(topic); + return topicPublishInfo; + } +} + +``` + +从 NameServer 查询路由信息方法: + +org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer) + +1、如果是默认的主题查询路由信息,返回成功,更新读队列和写队列的个数为默认的队列个数 + +```java +if (isDefault && defaultMQProducer != null) { + topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), + clientConfig.getMqClientApiTimeout()); + if (topicRouteData != null) { + for (QueueData data : topicRouteData.getQueueDatas()) { + int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); + data.setReadQueueNums(queueNums); + data.setWriteQueueNums(queueNums); + } + } +} +``` + +2、返回路由信息之后,与本地缓存的路由信息比对,判断路由信息是否发生变化,如果发生变化更新 broker 地址缓存,更新`topicPublishInfoTable`,更新 topic 路由信息缓存`topicRouteTable` + +```java +if (changed) { + TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); + } + + // Update Pub info + if (!producerTable.isEmpty()) { + TopicPublishInfo publishInfo =topicRouteData2TopicPublishInfo(topic, topicRouteData); + publishInfo.setHaveTopicRouterInfo(true); + Iterator> it = this.producerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + MQProducerInner impl = entry.getValue(); + if (impl != null) { + impl.updateTopicPublishInfo(topic, publishInfo); + } + } + } + log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); + this.topicRouteTable.put(topic, cloneTopicRouteData); + return true; +} +``` + +第三步:选择消息 队列 + +设置消息发送失败重试次数 + +`int timesTotal = communicationMode == CommunicationMode.*SYNC* ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;` + +`MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);` + +首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的 broker + +```java +public MessageQueue selectOneMessageQueue(final String lastBrokerName) { + if (lastBrokerName == null) { + return selectOneMessageQueue(); + } else { + for (int i = 0; i < this.messageQueueList.size(); i++) { + int index = this.sendWhichQueue.incrementAndGet(); + int pos = Math.abs(index) % this.messageQueueList.size(); + if (pos < 0) + pos = 0; + MessageQueue mq = this.messageQueueList.get(pos); + if (!mq.getBrokerName().equals(lastBrokerName)) { + return mq; + } + } + return selectOneMessageQueue(); + } +} +``` + +如果启用故障延迟机制: + +轮询获取队列 ,如果可用直接返回 + +```java +for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { + int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); + if (pos < 0) + pos = 0; + MessageQueue mq = tpInfo.getMessageQueueList().get(pos); + if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) + return mq; +} +``` + +判断是否可用逻辑:先从要规避的 broker 集合`faultItemTable`中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用 + +```java +public boolean isAvailable(final String name) { + final FaultItem faultItem = this.faultItemTable.get(name); + if (faultItem != null) { + return faultItem.isAvailable(); + } + return true; +} +``` + +如果没有可用的 broker,尝试从 规避的 broker 集合中选择一个可用的 broker,如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker + +```java +final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); +int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); +if (writeQueueNums > 0) { + final MessageQueue mq = tpInfo.selectOneMessageQueue(); + if (notBestBroker != null) { + mq.setBrokerName(notBestBroker); + mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); + } + return mq; +} else { + latencyFaultTolerance.remove(notBestBroker); +} +``` + +P.S. : + +要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新 + +```java +public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { + if (this.sendLatencyFaultEnable) { + long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); + this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); + } +} +``` + +主要更新消息发送故障的延迟时间`currentLatency`和故障规避的 开始时间`startTimestamp` + +```java +public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { + FaultItem old = this.faultItemTable.get(name); + if (null == old) { + final FaultItem faultItem = new FaultItem(name); + faultItem.setCurrentLatency(currentLatency); + faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + + old = this.faultItemTable.putIfAbsent(name, faultItem); + if (old != null) { + old.setCurrentLatency(currentLatency); + old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + } + } else { + old.setCurrentLatency(currentLatency); + old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); + } +} +``` + +总结: + +不管开不开启故障延迟机制,都可以规避故障的 broker,只是开启故障延迟机制,会在一段时间内都不会访问到该 broker,而不开启只是下一次不会访问到该 broker + +第四步:消息发送 + +org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl + +1、为消息分配全局唯一 id + +```java +if (!(msg instanceof MessageBatch)) { + MessageClientIDSetter.setUniqID(msg); +} +``` + +2、消息体大于 4k 启用压缩 + +```java +boolean msgBodyCompressed = false; +if (this.tryToCompressMessage(msg)) { + sysFlag |= MessageSysFlag.COMPRESSED_FLAG; + msgBodyCompressed = true; +} +``` + +3、如果是事务消息,设置消息类型为事务消息 + +```java +final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); +if (Boolean.parseBoolean(tranMsg)) { + sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; +} +``` + +4、校验是否超时 + +```java +long costTimeSync = System.currentTimeMillis() - beginStartTime; +if (timeout < costTimeSync) { + throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); +} +``` + +5、组装请求头 + +```java +SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); +requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); +requestHeader.setTopic(msg.getTopic()); +requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); +requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); +requestHeader.setQueueId(mq.getQueueId()); +requestHeader.setSysFlag(sysFlag); +requestHeader.setBornTimestamp(System.currentTimeMillis()); +requestHeader.setFlag(msg.getFlag()); +requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); +requestHeader.setReconsumeTimes(0); +requestHeader.setUnitMode(this.isUnitMode()); +requestHeader.setBatch(msg instanceof MessageBatch); +if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); + if (reconsumeTimes != null) { + requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); + } + + String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); + if (maxReconsumeTimes != null) { + requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); + } +} +``` + +6、发送请求 + +```java +caseSYNC: + long costTimeSync = System.currentTimeMillis() - beginStartTime; + if (timeout < costTimeSync) { + throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); + } + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout - costTimeSync, + communicationMode, + context, + this); + break; +``` + +第五步:处理响应结果 + +1、处理状态码 + +```java +switch (response.getCode()) { + case ResponseCode.FLUSH_DISK_TIMEOUT: { + sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; + break; + } + case ResponseCode.FLUSH_SLAVE_TIMEOUT: { + sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; + break; + } + case ResponseCode.SLAVE_NOT_AVAILABLE: { + sendStatus = SendStatus.SLAVE_NOT_AVAILABLE; + break; + } + case ResponseCode.SUCCESS: { + sendStatus = SendStatus.SEND_OK; + break; + } + default: { + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); + } +} +``` + +2、构造 SendResult + +```java +SendResult sendResult = new SendResult(sendStatus, + uniqMsgId, + responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); +sendResult.setTransactionId(responseHeader.getTransactionId()); +String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); +String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); +if (regionId == null || regionId.isEmpty()) { + regionId = MixAll.DEFAULT_TRACE_REGION_ID; +} +if (traceOn != null && traceOn.equals("false")) { + sendResult.setTraceOn(false); +} else { + sendResult.setTraceOn(true); +} +sendResult.setRegionId(regionId); +``` diff --git a/docs/rocketmq/rocketmq-send-store.md b/docs/rocketmq/rocketmq-send-store.md new file mode 100644 index 0000000..f8f870e --- /dev/null +++ b/docs/rocketmq/rocketmq-send-store.md @@ -0,0 +1,193 @@ +该文所涉及的 RocketMQ 源码版本为 4.9.3。 + +# RocketMQ 消息发送存储流程 + +第一步:检查消息存储状态 + +org.apache.rocketmq.store.DefaultMessageStore#checkStoreStatus + +1、检查 broker 是否可用 + +```java +if (this.shutdown) { +log.warn("message store has shutdown, so putMessage is forbidden"); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; +} +``` + +2、检查 broker 的角色 + +```java +if (BrokerRole.SLAVE== this.messageStoreConfig.getBrokerRole()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { +log.warn("broke role is slave, so putMessage is forbidden"); + } + return PutMessageStatus.SERVICE_NOT_AVAILABLE; +} +``` + +3、检查 messageStore 是否可写 + +```java +if (!this.runningFlags.isWriteable()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { +log.warn("the message store is not writable. It may be caused by one of the following reasons: " + + "the broker's disk is full, write to logic queue error, write to index file error, etc"); + } + return PutMessageStatus.SERVICE_NOT_AVAILABLE; +} else { + this.printTimes.set(0); +} +``` + +4、检查 pageCache + +```java +if (this.isOSPageCacheBusy()) { + return PutMessageStatus.OS_PAGECACHE_BUSY; +} +``` + +第二步:检查消息 + +org.apache.rocketmq.store.DefaultMessageStore#checkMessage + +1、校验主题的长度不能大于 127 + +```java +if (msg.getTopic().length() > Byte.MAX_VALUE) { +log.warn("putMessage message topic length too long " + msg.getTopic().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; +} +``` + +2、校验属性的长度不能大于 32767 + +```java +if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { +log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; +} +``` + +第三步:获取当前可以写入的 CommitLog 文件 + +CommitLog 文件的存储目录为${ROCKET_HOME}/store/commitlog ,MappedFileQueue 对应此文件夹,MappedFile 对应文件夹下的文件 + +```java +msg.setStoreTimestamp(beginLockTimestamp); + +if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise +} +if (null == mappedFile) { + log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); +} +``` + +如果是第一次写入或者最新偏移量所属文件已满,创建新的文件 + +```java +public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { + long createOffset = -1; + MappedFile mappedFileLast = getLastMappedFile(); + + if (mappedFileLast == null) { + createOffset = startOffset - (startOffset % this.mappedFileSize); + } + + if (mappedFileLast != null && mappedFileLast.isFull()) { + createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; + } + + if (createOffset != -1 && needCreate) { + return tryCreateMappedFile(createOffset); + } + + return mappedFileLast; +} +``` + +第四步:将消息写入到 MappedFile 中 + +```java +public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { + assert messageExt != null; + assert cb != null; + + int currentPos = this.wrotePosition.get(); + + if (currentPos < this.fileSize) { + ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); + byteBuffer.position(currentPos); + AppendMessageResult result; + if (messageExt instanceof MessageExtBrokerInner) { + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + (MessageExtBrokerInner) messageExt, putMessageContext); + } else if (messageExt instanceof MessageExtBatch) { + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + (MessageExtBatch) messageExt, putMessageContext); + } else { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + this.wrotePosition.addAndGet(result.getWroteBytes()); + this.storeTimestamp = result.getStoreTimestamp(); + return result; + } +log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); +} +``` + +org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.CommitLog.PutMessageContext) + +计算要写入的偏移量 + +`long wroteOffset = fileFromOffset + byteBuffer.position();` + +对事务消息做特殊处理: + +```java +final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); +switch (tranType) { + // Prepared and Rollback message is not consumed, will not enter the + // consumer queue + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + queueOffset = 0L; + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + default: + break; +} +``` + +构造 AppendMessageResult: + +```java +AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, + msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); +``` + +事务消息特殊处理: + +```java +switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + CommitLog.this.topicQueueTable.put(key, ++queueOffset); + CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); + break; + default: + break; +} +``` diff --git a/images/spring/image-20210902072224002.png b/images/spring/image-20210902072224002.png new file mode 100644 index 0000000..6192f69 Binary files /dev/null and b/images/spring/image-20210902072224002.png differ diff --git a/images/spring/image-20210903080803199.png b/images/spring/image-20210903080803199.png new file mode 100644 index 0000000..145a4cf Binary files /dev/null and b/images/spring/image-20210903080803199.png differ diff --git a/images/spring/image-20210904161436139.png b/images/spring/image-20210904161436139.png new file mode 100644 index 0000000..3e286c1 Binary files /dev/null and b/images/spring/image-20210904161436139.png differ diff --git a/images/spring/image-20210904161808341.png b/images/spring/image-20210904161808341.png new file mode 100644 index 0000000..a0e9640 Binary files /dev/null and b/images/spring/image-20210904161808341.png differ diff --git a/images/spring/image-20210904162844126.png b/images/spring/image-20210904162844126.png new file mode 100644 index 0000000..a7b0331 Binary files /dev/null and b/images/spring/image-20210904162844126.png differ diff --git a/images/spring/image-20210904174616712.png b/images/spring/image-20210904174616712.png new file mode 100644 index 0000000..849e910 Binary files /dev/null and b/images/spring/image-20210904174616712.png differ diff --git a/images/spring/image-20211213224509864.png b/images/spring/image-20211213224509864.png new file mode 100644 index 0000000..bdaa990 Binary files /dev/null and b/images/spring/image-20211213224509864.png differ diff --git a/images/spring/image-20211213224920994.png b/images/spring/image-20211213224920994.png new file mode 100644 index 0000000..131d787 Binary files /dev/null and b/images/spring/image-20211213224920994.png differ diff --git a/images/spring/image-20211213225044814.png b/images/spring/image-20211213225044814.png new file mode 100644 index 0000000..e88bbe8 Binary files /dev/null and b/images/spring/image-20211213225044814.png differ diff --git a/images/spring/image-20211213225124831.png b/images/spring/image-20211213225124831.png new file mode 100644 index 0000000..9dc9fda Binary files /dev/null and b/images/spring/image-20211213225124831.png differ diff --git a/images/spring/image-20211213225330193.png b/images/spring/image-20211213225330193.png new file mode 100644 index 0000000..a18d61c Binary files /dev/null and b/images/spring/image-20211213225330193.png differ diff --git a/images/spring/image-20211213225748030.png b/images/spring/image-20211213225748030.png new file mode 100644 index 0000000..dc67c77 Binary files /dev/null and b/images/spring/image-20211213225748030.png differ diff --git a/images/spring/image-20211213225831583.png b/images/spring/image-20211213225831583.png new file mode 100644 index 0000000..a54fade Binary files /dev/null and b/images/spring/image-20211213225831583.png differ diff --git a/images/spring/image-20211213225953964.png b/images/spring/image-20211213225953964.png new file mode 100644 index 0000000..c78eb72 Binary files /dev/null and b/images/spring/image-20211213225953964.png differ diff --git a/images/spring/image-20211213230042502.png b/images/spring/image-20211213230042502.png new file mode 100644 index 0000000..d712bd9 Binary files /dev/null and b/images/spring/image-20211213230042502.png differ diff --git a/images/spring/image-20211213230212297.png b/images/spring/image-20211213230212297.png new file mode 100644 index 0000000..098916c Binary files /dev/null and b/images/spring/image-20211213230212297.png differ diff --git a/index.html b/index.html index 5842c04..0940a71 100644 --- a/index.html +++ b/index.html @@ -1,150 +1,163 @@ - - - - - - 读尽天下源码,心中自然无码 - - - - - - - - - - - - - -
本系列知识由 Doocs 开源社区总结发布
- - - - - - - - - - - - - - - + + + + + + 读尽天下源码,心中自然无码 + + + + + + + + + + + + + +
本系列知识由 Doocs 开源社区总结发布
+ + + + + + + + + + + + + + + + + \ No newline at end of file