Merge pull request #1 from doocs/main

update
pull/124/head
yang520-bye 3 years ago committed by GitHub
commit 527142924a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,16 +18,16 @@ jobs:
uses: actions/checkout@v2
- name: Compress Images
uses: calibreapp/image-actions@master
uses: calibreapp/image-actions@main
with:
githubToken: ${{ secrets.GITHUB_TOKEN }}
compressOnly: true
- name: Commit Files
run: |
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
git commit -m "[Automated] Optimize images" -a
git config --local user.email "szuyanglb@outlook.com"
git config --local user.name "yanglbme"
git commit -m "chore: auto compress images" -a
- name: Push Changes
uses: ad-m/github-push-action@master

@ -13,12 +13,9 @@
本项目主要用于记录框架及中间件源码的阅读经验、个人理解及解析,希望能够使阅读源码变成一件简单有趣,且有价值的事情,抽空更新中... (如果本项目对您有帮助,请 watch、star、fork 素质三连一波,鼓励一下作者,谢谢)
- Netlify: https://schunter.netlify.app
- ~~Gitee Pages: https://doocs.gitee.io/source-code-hunter~~
- Gitee Pages: https://doocs.gitee.io/source-code-hunter
- GitHub Pages: https://doocs.github.io/source-code-hunter
注:😶 Gitee Pages 站点遭 Gitee 官方误判为“包含违禁违规内容”,惨遭下线。
## Spring 系列
### IoC 容器
@ -60,6 +57,10 @@
- [面筋哥 IoC 容器的一天(上)](</docs/Spring/Spring源码故事瞎编版/面筋哥IoC容器的一天(上).md>)
### Spring 整体脉络
- [16 张图解锁 Spring 的整体脉络](/docs/Spring/Spring整体脉络/16张图解锁Spring的整体脉络.md)
### Spring 类解析
- [Spring 自定义标签解析](/docs/Spring/clazz/Spring-Custom-label-resolution.md)
@ -115,9 +116,10 @@
- [SpringBoot ConditionalOnBean](/docs/SpringBoot/SpringBoot-ConditionalOnBean.md)
### SpringSecurity
- [SpringSecurity请求全过程解析](/docs/SpringSecurity/SpringSecurity请求全过程解析.md)
- [SpringSecurity自定义用户认证](/docs/SpringSecurity/SpringSecurity自定义用户认证.md)
-
- [SpringSecurity 请求全过程解析](/docs/SpringSecurity/SpringSecurity请求全过程解析.md)
- [SpringSecurity 自定义用户认证](/docs/SpringSecurity/SpringSecurity自定义用户认证.md)
## MyBatis
### 基础支持层
@ -269,6 +271,21 @@
- [Sentinel 底层 LongAdder 的计数实现](docs/Sentinel/Sentinel底层LongAdder的计数实现.md)
- [Sentinel 限流算法的实现](docs/Sentinel/Sentinel限流算法的实现.md)
## RocketMQ
- [RocketMQ NameServer 与 Broker 的通信](docs/rocketmq/rocketmq-nameserver-broker.md)
- [RocketMQ 生产者启动流程](docs/rocketmq/rocketmq-producer-start.md)
- [RocketMQ 消息发送流程](docs/rocketmq/rocketmq-send-message.md)
- [RocketMQ 消息发送存储流程](docs/rocketmq/rocketmq-send-store.md)
- [RocketMQ MappedFile内存映射文件详解](docs/rocketmq/rocketmq-mappedfile-detail.md)
- [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md)
- [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md)
- [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md)
- [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md)
- [RocketMQ 消息拉取流程](docs/rocketmq/rocketmq-pullmessage.md)
- [RocketMQ Broker 处理拉取消息请求流程](docs/rocketmq/rocketmq-pullmessage-processor.md)
- [RocketMQ 消息消费流程](docs/rocketmq/rocketmq-consume-message-process.md)
## 番外篇JDK 1.8
### 基础类库

@ -2,7 +2,7 @@
在使用 Spring 声明式事务处理 的时候,一种常用的方法是结合 IoC 容器 和 Spring 已有的 TransactionProxyFactoryBean 对事务管理进行配置,比如,可以在这个 TransactionProxyFactoryBean 中为事务方法配置传播行为、并发事务隔离级别等事务处理属性,从而对声明式事务的处理提供指导。具体来说,在对声明式事务处理的原理分析中,声明式事务处理的实现大致可以分为以下几个部分:
- 读取和处理在 IoC 容器 中配置的事务处理属性,并转化为 Spring 事务处理 需要的内部数据结构,这里涉及的类是 TransactionAttributeSourceAdvisor从名字可以看出它是一个 AOP 通知器Spring 使用这个通知器来完成对事务处理属性值的处理。处理的结果是,在 IoC 容器 中配置的事务处理属性信息,会被读入并转化成 TransactionAttribute 表示的数据对象,这个数据对象是 Spring 对事处理属性值的数据抽象,对这些属性的处理是和 TransactionProxyFactoryBean 拦截下来的事务方法的处理结合起来的。
- 读取和处理在 IoC 容器 中配置的事务处理属性,并转化为 Spring 事务处理 需要的内部数据结构,这里涉及的类是 TransactionAttributeSourceAdvisor从名字可以看出它是一个 AOP 通知器Spring 使用这个通知器来完成对事务处理属性值的处理。处理的结果是,在 IoC 容器 中配置的事务处理属性信息,会被读入并转化成 TransactionAttribute 表示的数据对象,这个数据对象是 Spring 对事处理属性值的数据抽象,对这些属性的处理是和 TransactionProxyFactoryBean 拦截下来的事务方法的处理结合起来的。
- Spring 事务处理模块 实现统一的事务处理过程。这个通用的事务处理过程包含处理事务配置属性以及与线程绑定完成事务处理的过程Spring 通过 TransactionInfo 和 TransactionStatus 这两个数据对象,在事务处理过程中记录和传递相关执行场景。
- 底层的事务处理实现。对于底层的事务操作Spring 委托给具体的事务处理器来完成,这些具体的事务处理器,就是在 IoC 容器 中配置声明式事务处理时,配置的 PlatformTransactionManager 的具体实现,比如 DataSourceTransactionManager 和 HibernateTransactionManager 等。

@ -0,0 +1,261 @@
作者: [Java4ye](https://github.com/Java4ye)
### 概览
本文将讲解 Spring 的原理,看看一个 Bean 是怎么被创建出来的,中间经历过那几道工序加工,它的生命周期是怎样的,以及有哪些扩展点,后置处理器可以使用,让你对 Spring 多一些了解!
### 目录
本文会先大概介绍下这些知识点 👇
![image-20211213224509864](../../../images/spring/image-20211213224509864.png)
### 印象中的 Spring
脑海中有这么一条公式:
👉 IOC = 工厂模式 + XML + 反射
👉 而 DI , AOP **事务** 等也都在 XML 中很直观的表现出来
虽然我们现在大部分用这个注解来代替,但是原理还是基本一样的 🐖
注解使用起来很方便,但是学习的话,还是建议先通过这个 XML ,毕竟结构性的文档,有层次感,可以留下更深的印象~ 😄
### 小小 Spring
把 Spring 浓缩一下,就有了这么一点小东西 🐖
![image-20211213224920994](../../../images/spring/image-20211213224920994.png)
想了下,我们用 Spring ,其中最主要的一点,就是用它来帮我们管理,创建这个 Bean 。
那么先从源头看起 —— Bean 从哪来 (\_;)
### Bean 解析流程
![image-20211213225044814](../../../images/spring/image-20211213225044814.png)
如图所示,就是通过 **解析器**,对我们的 XML 文件或者注解进行解析,最后将这些信息封装在 BeanDefinition 类中,并通过 BeanDefinitionRegistry 接口将这些信息 **注册** 起来,放在 beanDefinitionMap 变量中, key : beanName , value BeanDefinition 。
简单看看 BeanDefinition 中的属性叭
### BeanDefinition
- beanClass : bean 的类型 ,实例化时用的 🐖
- scope : 作用范围有 singletonprototype
- isLazy : **懒加载** true 的话 会在 getBean 时生成,而且 scope 的 prototype 无效false 在 Spring 启动过程中直接生成
- initMethodName : 初始化方法,当然是初始化时调用 🐖
- primary : 主要的,有多个 Bean 时使用它
- dependsOn : 依赖的 Bean必须等依赖 Bean 创建好才可以创建
> PS: @Component @Bean , <bean/> 都会被解析成 BeanDefinition
### 反射
有了原料后呢,咋们再来看看这个 **工厂** BeanFactory
先简单想一想这个工厂要怎么创建这个 Bean 呢?
没错,肯定就是这个 **反射** 啦 😄
那么,结合我们从原料中获取的重要属性之一的 beanClass ,我们可以画出这么一张图 👇
![image-20211213225124831](../../../images/spring/image-20211213225124831.png)
那么我们再来看看这个 BeanFactory 叭 😄
### BeanFactory
先来看看 作为 IOC 容器的**根接口** 的 BeanFactory 提供了什么方法吧 👇
![image-20210904162844126](../../../images/spring/image-20210904162844126.png)
主要是这个 getBean 方法,以及 **别名获取****类型获取** 方法和其他一些判断方法如 **单例****多例****类型匹配****包含 bean**
我们来简单看看它的子接口都有哪些叭~😄
这里分享个小技巧叭 🐖
看源码的时候,一般就直接看这个**默认**接口 如这里的 DefaultListableBeanFactory
![image-20210904161436139](../../../images/spring/image-20210904161436139.png)
基本上看个类名就知道大概作用了,那么先对号入座下 👇
**ListableBeanFactory**
> 👉 遍历 bean
**HierarchicalBeanFactory**
> 👉 提供 父子关系,可以获取上一级的 BeanFactory
**ConfigurableBeanFactory**
> 👉 实现了 SingletonBeanRegistry ,主要是 单例 Bean 的注册,生成
**AutowireCapableBeanFactory**
> 👉 和自动装配有关的
**AbstractBeanFactory**
> 👉 单例缓存,以及 FactoryBean 相关的
**ConfigurableListableBeanFactory**
> 👉 预实例化单例 Bean分析修改 BeanDefinition
**AbstractAutowireCapableBeanFactory**
> 👉 创建 Bean ,属性注入,实例化,调用初始化方法 等等
**DefaultListableBeanFactory**
> 👉 支持单例 Bean Bean 别名 ,父子 BeanFactoryBean 类型转化 Bean 后置处理FactoryBean自动装配等
是不是非常丰富 😄
### FactoryBean
FactoryBean ,它本身就是个 Bean算是小工厂 ,归 BeanFactory 这个大工厂管理的。
![image-20210904174616712](../../../images/spring/image-20210904174616712.png)
可以看到它就只有三个方法
1. `getObject()` 获取对象
2. `isSingleton()` 单例对象
3. `getObjectType()` 返回的是 Bean 对象的类型
相比大工厂 BeanFactory 少了特别多东西,没有严格的 Bean 生命周期流程 😄
FactoryBean 对象本身也是一个 Bean是一个小工厂可以生产另外的 Bean
BeanFactory 是 Spring 容器的根接口,是大工厂,生产各种各样的 Bean
beanName 就是正常对象
&”+beanName 获取的是实现了该接口的 FactoryBean 工厂对象
大致如下 👇
![image-20211213225330193](../../../images/spring/image-20211213225330193.png)
### ApplicationContext
我们再来看看这个 ApplicationContext
![image-20210904161808341](../../../images/spring/image-20210904161808341.png)
可以看到它扩展了很多功能,除了 BeanFactory ,它还可以**创建 , 获取 Bean**,以及处理**国际化****事件****获取资源**等
- EnvironmentCapable 获取 环境变量 的功能,可以获取到 **操作系统变量** 和 **JVM 环境变量**
- ListableBeanFactory 获取所有 BeanNames判断某个 BeanName 是否存在 BeanDefinition 对象,统计 BeanDefinition 对象,获取某个类型对应的所有 beanNames 等功能
- HierarchicalBeanFactory 获取父 BeanFactory ,判断某个 name 是否存在 bean 对象的功能
- MessageSource **国际化功能**,获取某个国际化资源
- ApplicationEventPublisher **事件发布功能**(重点)
- ResourcePatternResolver **加载,获取资源的功能**,这里的资源可能是文件,图片 等某个 URL 资源都可以
还有这三个重要的类 👇,就不一一介绍先啦 😄
1. ClassPathXmlApplicationContext
2. AnnotationConfigApplicationContext
3. FileSystemXmlApplicationContext
赶紧来看看这个核心叭!
### IOC 容器
当然,这时候出场的肯定是 IOC 啦。
我们都知道 IOC 是 **控制反转** ,但是别忘了 **容器** 这个词,比如 **容器的根接口 **BeanFactory **容器的实现** 👇
1. ClassPathXmlApplicationContext
2. AnnotationConfigApplicationContext
3. FileSystemXmlApplicationContext
同时我们要注意这里无处不在的 **后置处理器** xxxPostProcessor 🐷
这个是 Spring 中扩展性强的原因了!
我们可以在各个过程中合理应用这些 PostProcessor 来扩展,或者修改 Bean 定义信息等等
![image-20211213225748030](../../../images/spring/image-20211213225748030.png)
可以看到在这个容器中,完成了 Bean 的初始化,而这个过程还有很多细节 ,请往下看看 👇
DI 到时写 **属性填充** 时再介绍 🐷
### BeanFactory 后置处理器
作为 IOC 容器根接口的 BeanFactory ,有着非常高的扩展性,比如最开始获取原料 BeanDefinition 时,就出现了两个针对 BeanFactory 工厂的后置处理器 👇
BeanDefinitionRegistryPostProcessor
> 通过该接口,我们可以自己掌控我们的 **原料**,通过 BeanDefinitionRegistry 接口去 **新增****删除****获取**我们这个 BeanDefinition
BeanFactoryPostProcessor
> 通过该接口,可以在 **实例化对象前**,对 BeanDefinition 进行修改 **冻结** **预实例化单例 Bean** 等
经过上面层层阻碍后,我们最终会来到目标方法 getBean ,将原料投入生产,最终获取一个个 Bean 对象出来
那么随之而来的就是这个 Bean 的生命周期啦 😄
### Bean 生命周期
Bean 的创建和管理有**标准化的流程**
这里在我们的工厂 BeanFactory 中写得很清楚 👇
![image-20210902072224002](../../../images/spring/image-20210902072224002.png)
总共 **14** 个步骤,是不是一下子就清晰多了 😄
![image-20211213225831583](../../../images/spring/image-20211213225831583.png)
在看这部分的源码时,要多注意两个英文单词 😝
1. **实例化** 👉 **Instantiation**
2. **初始化** 👉 **Initialization**
ps: 别看快搞错了 哈哈 😝
仔细阅读上面这 14 个步骤,会发现前面 **8** 个都是 Aware 接口,而他们的作用也很简单,就是获取 xxAware 这个单词的前缀 xx 😄
比如事件发布器 ApplicationEventPublisher ,只要你实现了 ApplicationEventPublisherAware 接口,就可以**获取** 事件发布器 ApplicationEventPublisher
### Bean 后置处理器
在实例化 和 初始化流程中,把这个 Bean 的后置处理器 BeanPostProcessor 安排上,就得到下图啦 👇
![image-20211213225953964](../../../images/spring/image-20211213225953964.png)
这里留意下 **实例化** 有扩展点 InstantiationAwareBeanPostProcessor **初始化** 扩展点 BeanPostProcessor 就非常多啦,我们主要来关注下这个 AOP
### AOP
那么 AOP 是在哪个步骤代理对象的呢?👇
![image-20211213230042502](../../../images/spring/image-20211213230042502.png)
可以在 AbstractAutoProxyCreator 类中看到 👇
![image-20210903080803199](../../../images/spring/image-20210903080803199.png)
### 总结
本文就先介绍到这里啦 🐖
主要介绍了 Spring 里面的这些脉络,方便小伙伴们对它有个整体的印象先~
再介绍其中的一些扩展点,比如从源材料开始的 BeanFactoryPostprocessor ,到产物 Bean 的 BeanPostprocessor 。
实例化初始化的顺序Bean 的生命周期,以及 BeanFactory 及子类扩展的功能,再到 ApplicationContext 的功能。
还有这个核心机制: **工厂+XML+反射**,以及 AOP **发生的地方**。😋
![image-20211213230212297](../../../images/spring/image-20211213230212297.png)

@ -10,7 +10,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.boot.nacos.discovery.autoconfigure.NacosDiscoveryAutoConfiguration
```
找到注解`NacosDiscoveryAutoConfiguration`
找到`NacosDiscoveryAutoConfiguration`
```java
@ConditionalOnProperty(name = NacosDiscoveryConstants.ENABLED, matchIfMissing = true)
@ -301,18 +301,18 @@ class BeatTask implements Runnable {
try {
// 与nacos进行一次rest请求交互
JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.getIntValue("clientBeatInterval");
long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
boolean lightBeatEnabled = false;
if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.containsKey(CommonParams.CODE)) {
code = result.getIntValue(CommonParams.CODE);
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 如果nacos找不到当前实例,
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
@ -336,8 +336,12 @@ class BeatTask implements Runnable {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
```
@ -359,47 +363,41 @@ class BeatTask implements Runnable {
NacosException exception = new NacosException();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
// 获取nacos所在的ip+port地址
String server = servers.get(index);
try {
// 进行请求
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
}
if (serverListManager.isDomain()) {
String nacosDomain = serverListManager.getNacosDomain();
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",
api, servers, exception.getErrCode(), exception.getErrMsg());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
}
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
```
**学习点**
@ -437,19 +435,24 @@ public void registerService(String serviceName, String groupName, Instance insta
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
final Map<String, String> params = new HashMap<String, String>(9);
final Map<String, String> params = new HashMap<String, String>(32);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
@ -645,76 +648,41 @@ public Instance getInstance(String namespaceId, String serviceName, String clust
```java
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public JSONObject beat(HttpServletRequest request) throws Exception {
JSONObject result = new JSONObject();
result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JSON.parseObject(beat, RsInfo.class);
}
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
}
@Secured(action = ActionTypes.WRITE)
public ObjectNode beat(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId,
@RequestParam String serviceName,
@RequestParam(defaultValue = StringUtils.EMPTY) String ip,
@RequestParam(defaultValue = UtilsAndCommons.DEFAULT_CLUSTER_NAME) String clusterName,
@RequestParam(defaultValue = "0") Integer port,
@RequestParam(defaultValue = StringUtils.EMPTY) String beat)throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
}
// 获取实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 处理心跳方法
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
int resultCode = instanceServiceV2
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
result.put(CommonParams.CODE, resultCode);
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
instanceServiceV2.getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
```

@ -0,0 +1,272 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ CommitLog 详解
commitlog 目录主要存储消息,为了保证性能,顺序写入,每一条消息的长度都不相同,每条消息的前面四个字节存储该条消息的总长度,每个文件大小默认为 1G文件的命名是以 commitLog 起始偏移量命名的,可以通过修改 broker 配置文件中 mappedFileSizeCommitLog 属性改变文件大小
1、获取最小偏移量
org.apache.rocketmq.store.CommitLog#getMinOffset
```java
public long getMinOffset() {
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
if (mappedFile != null) {
if (mappedFile.isAvailable()) {
return mappedFile.getFileFromOffset();
} else {
return this.rollNextFile(mappedFile.getFileFromOffset());
}
}
return -1;
}
```
获取目录下的第一个文件
```java
public MappedFile getFirstMappedFile() {
MappedFile mappedFileFirst = null;
if (!this.mappedFiles.isEmpty()) {
try {
mappedFileFirst = this.mappedFiles.get(0);
} catch (IndexOutOfBoundsException e) {
//ignore
} catch (Exception e) {
log.error("getFirstMappedFile has exception.", e);
}
}
return mappedFileFirst;
}
```
如果该文件可用返回文件的起始偏移量,否则返回下一个文件的 起始偏移量
```java
public long rollNextFile(final long offset) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
return offset + mappedFileSize - offset % mappedFileSize;
}
```
2、根据偏移量和消息长度查找消息
org.apache.rocketmq.store.CommitLog#getMessage
```java
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
```
首先获取 commitLog 文件大小,默认 1G
`private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;`
获取偏移量所在的 MappedFile
org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
获取第一个 MappedFile 和最后一个 MappedFile校验偏移量是否在这两个 MappedFile 之间,计算当前偏移量所在 MappedFiles 索引值为当前偏移量的索引减去第一个文件的索引值
```java
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
```
根据在文件内的偏移量和消息长度获取消息内容
```java
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition();
if ((pos + size) <= readPosition) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
}
return null;
}
```
3、Broker 正常停止文件恢复
org.apache.rocketmq.store.CommitLog#recoverNormally
首先查询消息是否验证 CRC
`boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();`
从倒数第 3 个文件开始恢复,如果不足 3 个文件,则从第一个文件开始恢复
```java
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
```
循环遍历 CommitLog 文件,每次取出一条消息
`DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);`
如果查找结果为 true 并且消息的长度大于 0表示消息正确mappedFileOffset 指针向前移动本条消息的长度;
```java
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
```
如果查找结果为 true 并且结果等于 0表示已到文件 的末尾,如果还有下一个文件,则重置 processOffset、mappedOffset 并重复上述步骤,否则跳出循环;
```java
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
```
如果查找结果为 false则表示消息没有填满该文件跳出循环结束遍历
```java
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
```
更新 committedPosition 和 flushedWhere 指针
```java
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
```
删除 offset 之后的所有文件。遍历目录下面的所有文件,如果文件尾部偏移量小于 offset 则跳过该文件,如果尾部的偏移量大于 offset则进一步比较 offset 与文件的开始偏移量,如果 offset 大于文件的开始偏移量,说明当前文件包含了有效偏移量,设置 MappedFile 的 flushPosition 和 commitedPosition。
如果 offset 小于文件的开始偏移量,说明该文件是有效文件后面创建的,调用 MappedFile#destroy()方法释放资源
```java
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
file.destroy(1000);
willRemoveFiles.add(file);
}
}
```
释放资源需要关闭 MappedFile 和文件通道 fileChannel
```java
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
```
判断`maxPhyOffsetOfConsumeQueue`是否大于 processOffset如果大于需要删除 ConsumeQueue 中 processOffset 之后的数据
```java
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
```
```java
public void truncateDirtyLogicFiles(long phyOffset) {
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.truncateDirtyLogicFiles(phyOffset);
}
}
}
```

@ -0,0 +1,220 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ 消息消费流程
拉取消息 成功之后 会调用 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest 组装 消费消息 请求
获取 consumeMessageBatchMaxSize,表示一个 ConsumeRequest 包含的消息 数量,默认为 1
入参 msgs 为拉取消息的最大值,默认为 32
如果 msgs 小于等于 consumeMessageBatchMaxSize直接创建`ConsumeRequest`任务并提交到 线程池,当出现`RejectedExecutionException`异常时会重新提交任务,但是查看线程池的队列
`this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();`
为无界队列,最大值为`Integer.MAX_VALUE`,理论上不会出现该异常
```java
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
}
```
如果 msgs 大于 consumeMessageBatchMaxSize消息分批处理即创建多个`ConsumeRequest`任务
```java
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
```
`class ConsumeRequest implements Runnable`
详细的消费逻辑查看 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
第 1 步:首先会校验队列的 dropped 是否为 true当队列重平衡的时候该队列可能会被分配给其他消费者如果该队列被分配给其他消费者会设置 dropped 为 true
```java
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
```
第 2 步:如果是重试消息重新设置主题
```java
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
```
第 3 步:如果有钩子函数则执行
```java
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
```
第 4 步:调用消息监听器的`consumeMessage执行具体的消费逻辑` ,返回值为`ConsumeConcurrentlyStatus`
```java
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
```
```java
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}
```
第 5 步:如果有 钩子 函数执行钩子
```java
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS== status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
```
第 6 步:再次校验队列 的 dropped 状态 ,如果为 false 才会对结果进行处理
```java
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
```
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
第 7 步:计算 ackIndex如果为`CONSUME_SUCCESS`等于`consumeRequest.getMsgs().size() - 1;`
如果为`RECONSUME_LATER`等于-1
```java
switch (status) {
caseCONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
caseRECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
```
第 8 步:如果是广播模式并且是消费失败,打印警告 信息,如果是集群模式并且消费失败会将消息发送到 broker如果发送失败将消息封装到 consumerRequest 中延迟消费
```java
switch (this.defaultMQPushConsumer.getMessageModel()) {
caseBROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
caseCLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
```
第 9 步:更新消息消费偏移量
```java
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
```

@ -0,0 +1,199 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ ConsumeQueue 详解
RocketMQ 基于主题订阅模式实现消息消费,消费者关注每一个主题下的所有消息,但是同一主题下的消息是不连续地存储在 CommitLog 文件中的,如果消费者直接从消息存储文件中遍历查找主题下的消息,效率会特别低。所以为了在查找消息的时候效率更高一些,设计了 ConsumeQueue 文件,可以看作 CommitLog 消费的目录文件.
ConsumeQueue 的第一级目录为消息主题名称,第二级目录为主题的队列 id
为了加速 ConsumeQueue 消息的查询速度并节省磁盘空间,不会存储消息的全量信息,只会 存储一些 关键信息,如 8 字节的 CommmitLog 偏移量、4 字节的文件大小、8 字节的 tag 哈希码
1、根据消息存储时间查找物理偏移量
org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime
第一步:根据时间戳定位物理文件
```java
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
```
从第一个文件 开始,找到第一个更新时间大于该时间戳的文件
第二步:利用二分查找法来加速检索
计算最低查找偏移量,如果消息队列偏移量大于文件的偏移量,则最低偏移量等于消息队列偏移量减去文件的偏移量,反之为 0
`int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;`
计算中间偏移量,其中*`CQ_STORE_UNIT_SIZE` =* 8 字节的 CommmitLog 偏移量 + 4 字节的文件大小+8 字节的 tag 哈希码
`midOffset = (low + high) / (2 * *CQ_STORE_UNIT_SIZE*) * *CQ_STORE_UNIT_SIZE*;`
如果得到的物理偏移量小于当前最小物理偏移量,则待查找消息的物理偏移量大于 midOffset将 low 设置为 midOffset继续查询
```java
byteBuffer.position(midOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
low = midOffset +CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
```
如果得到的物理偏移量大于最小物理偏移量,说明该消息为有效信息,则根据消息物理偏移量和消息长度获取消息存储的时间戳
```java
long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
```
如果存储时间小于 0则为无效消息返回 0
如果存储时间戳等于待查找时间戳,说明查找到了目标消息,设置 targetOffset跳出循环
如果存储时间戳大于待查找时间戳,说明待查找消息的物理偏移量小于 midOffset设置 high 为 midOffset设置 rightIndexValue 等于 storeTime设置 rightOffset 为 midOffset
如果存储时间戳小于待查找时间戳,说明待查找消息的物理偏移量大于 midOffset设置 low 为 midOffset设置 leftIndexValue 等于 storeTime设置 leftOffset 为 midOffset
```java
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset -CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
low = midOffset +CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
```
如果 targetOffset 不等于-1表示找到了存储时间戳等于待查找时间戳的消息
如果 leftIndexValue 等于-1返回大于并且最接近待查找消息的时间戳的偏移量
如果 rightIndexValue 等于-1返回小于并且最接近待查找消息的时间戳的偏移量
```java
if (targetOffset != -1) {
offset = targetOffset;
} else {
if (leftIndexValue == -1) {
offset = rightOffset;
} else if (rightIndexValue == -1) {
offset = leftOffset;
} else {
offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset;
}
}
```
2、根据当前偏移量获取下一个文件的偏移量
org.apache.rocketmq.store.ConsumeQueue#rollNextFile
```java
public long rollNextFile(final long index) {
int mappedFileSize = this.mappedFileSize;
int totalUnitsInFile = mappedFileSize /CQ_STORE_UNIT_SIZE;
return index + totalUnitsInFile - index % totalUnitsInFile;
}
```
3、ConsumeQueue 添加消息
org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
将消息偏移量、消息长度、tag 哈希码写入 ByteBuffer将内容追加到 ConsumeQueue 的内存映射文件中。
```java
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset *CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
```
4、ConsumeQueue 文件删除
org.apache.rocketmq.store.ConsumeQueue#destroy
重置 ConsumeQueue 的 maxPhysicOffset 与 minLogicOffset调用 MappedFileQueue 的 destroy()方法将 ConsumeQueue 目录下的文件全部删除
```java
public void destroy() {
this.maxPhysicOffset = -1;
this.minLogicOffset = 0;
this.mappedFileQueue.destroy();
if (isExtReadEnable()) {
this.consumeQueueExt.destroy();
}
}
```

@ -0,0 +1,299 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ 消费者启动流程
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
`1、检查配置信息`
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig
校验消费组的长度不能大于 255
`public static final int CHARACTER_MAX_LENGTH = 255;`
```java
if (group.length() >CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
```
消费组名称只能包含数字、字母、%、-、_、|
```java
// regex: ^[%|a-zA-Z0-9_-]+$
// %
VALID_CHAR_BIT_MAP['%'] = true;
// -
VALID_CHAR_BIT_MAP['-'] = true;
// _
VALID_CHAR_BIT_MAP['_'] = true;
// |
VALID_CHAR_BIT_MAP['|'] = true;
for (int i = 0; i <VALID_CHAR_BIT_MAP.length; i++) {
if (i >= '0' && i <= '9') {
// 0-9
VALID_CHAR_BIT_MAP[i] = true;
} else if (i >= 'A' && i <= 'Z') {
// A-Z
VALID_CHAR_BIT_MAP[i] = true;
} else if (i >= 'a' && i <= 'z') {
// a-z
VALID_CHAR_BIT_MAP[i] = true;
}
}
```
```java
public static boolean isTopicOrGroupIllegal(String str) {
int strLen = str.length();
int len =VALID_CHAR_BIT_MAP.length;
boolean[] bitMap =VALID_CHAR_BIT_MAP;
for (int i = 0; i < strLen; i++) {
char ch = str.charAt(i);
if (ch >= len || !bitMap[ch]) {
return true;
}
}
return false;
}
```
消费组名称不能是`DEFAULT_CONSUMER`
`public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";`
```java
if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException("consumerGroup can not equal " + MixAll.DEFAULT_CONSUMER_GROUP
+ ", please specify another one." + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
}
```
消费者最小线程数需要在 1-1000 之间
```java
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
throw new MQClientException("consumeThreadMin Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
}
```
消费者最大线程数需要在 1-1000 之间
```java
if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
throw new MQClientException("consumeThreadMax Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
}
```
`2、设置订阅信息`
构造主题订阅消息`SubscriptionData`并将其加入`RebalanceImpl`,如果是消费模式是集群,订阅默认的重试主题并且构造`SubscriptionData`加入`RebalanceImpl`
```java
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
caseBROADCASTING:
break;
caseCLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
```
`3、初始化MqClientInstance、RebalanceImpl、PullApiWrapper`
创建`MqClientInstance` 无论在生产者端还是消费者端都是一个很重要的类, 封装了Topic信息、broker信息当然还有生产者和消费者的信息。
```java
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
```
构造`RebalanceImpl` 用来负载消费者与队列的消费关系
```java
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
```
构造`PullApiWrapper` 消费者拉取消息类
```java
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
```
`4、设置消息偏移量`
如果是广播模式消费,消息消费进度存储在消费端,如果是集群模式消费,消息消费进度存储在 broker 端
```java
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
caseBROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
caseCLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
```
`5、是否是顺序消费`
根据是否是顺序消费构造不同的`ConsumeMessageService`
```java
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
```
区别在于启动的线程任务不同:
顺序消费线程:
```java
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
```
正常消费线程:
```java
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
```
6`、启动MQClientInstance`
消费者与生产者共用 MQClientInstance
大部分流程已经在生产者启动流程中讲解,这里主要讲解与生产者不同的部分
启动保证消费者偏移量最终一致性的任务
```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
```
启动调整线程池大小任务:
```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
```
启动重平衡服务:
`this.rebalanceService.start();`
7`、更新订阅主题信息`
更新主题订阅信息:
```java
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
```

@ -0,0 +1,168 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ IndexFile 详解
首先明确一下 IndexFile 的文件结构
Index header + 哈希槽,每个槽下面挂载 index 索引,类似哈希表的结构
一个 Index 文件默认包含 500 万个哈希槽,一个哈希槽最多存储 4 个 index也就是一个 IndexFile 默认最多包含 2000 万个 index
Index header
40byte Index header = 8byte 的 beginTimestampIndexFile 对应第一条消息的存储时间) + 8byte 的 endTimestamp (IndexFile 对应最后一条消息的存储时间) + 8byte 的 beginPhyoffsetIndexFile 对应第一条消息在 CommitLog 的物理偏移量) + 8byte 的 endPhyoffsetIndexFile 对应最后一条消息在 CommitLog 的物理偏移量)+ 4byte 的 hashSlotCount已有 index 的槽个数)+ 4byte 的 indexCount索引个数
哈希槽:
每个哈希槽占用 4 字节,存储当前槽下面最新的 index 的序号
Index
20byte 的 index = 4byte 的 keyHashkey 的哈希码) + 8byte 的 phyOffset消息在文件中的物理偏移量+ 4byte 的 timeDiff该索引对应消息的存储时间与当前索引文件第一条消息的存储时间的差值+ 4byte 的 preIndexNo该条目的前一个 Index 的索引值)
1、将消息索引键与消息偏移量的映射关系写入 indexFile
org.apache.rocketmq.store.index.IndexFile#putKey
当前已使用的 Index 大于等于允许的最大个数时,返回 false表示当前 Index 文件已满。
如果当前 Index 文件未满,则根据 key 计算出哈希码,然后对槽数量取余定位到某一个哈希槽位置,
哈希槽的物理偏移量 = IndexHeader 的大小(默认 40Byte + 哈希槽位置 * 每个哈希槽的大小4 字节)
```java
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE+ slotPos * hashSlotSize;
```
读取哈希槽中的数据,如果哈希槽中的数据小于 0 或者大于 index 的个数,则为无效索引,将 slotValue 置为 0
```java
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <=invalidIndex|| slotValue > this.indexHeader.getIndexCount()) {
slotValue =invalidIndex;
}
```
计算本次存储消息的时间戳与 indexFile 第一条消息存储时间戳的差值并转换为秒
```java
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
```
新添加的消息 index 的物理偏移量 = IndexHeader 大小40Byte + Index 文件哈希槽的数量 * 哈希槽的大小4Byte + Index 文件索引数量 * 索引大小20Byte
将消息哈希码、消息物理偏移量、消息存储时间戳与 Index 文件第一条消息的时间戳的差值、当前哈希槽的值、当前 Indexfile 的索引个数存入 mappedByteBuffer
```java
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE+ this.hashSlotNum *hashSlotSize
+ this.indexHeader.getIndexCount() *indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
```
更新 IndexHeader 信息:
如果该 IndexFile 哈希槽中消息的数量小于等于 1更新 IndexHeader 的 beginPhyOffset 和 beginTimesttamp
每次添加消息之后更新 IndexCount、endPhyOffset、endTimestamp
```java
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex== slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
```
2、根据 key 查找消息
org.apache.rocketmq.store.index.IndexFile#selectPhyOffset
参数如下:
`List<Long> phyOffsets` 查询到的物理偏移量
`String key: 索引key`
`int maxNum`:本次查找的最大消息条数
`long begin`:开始时间戳
long end: 结束时间戳
根据 key 计算哈希码,哈希码与哈希槽的数量取余得到哈希槽的索引
哈希槽的物理地址 = IndexHeader40byte + 哈希槽索引 * 每个哈希槽的大小4byte
```java
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE+ slotPos * hashSlotSize;
```
`从mappedByteBuffer`获取哈希槽的值,如果值小于等于 0 或者值大于 IndexCount
或者 IndexCount 的 值小于等于 1 则表示没有有效的结果数据
如果查询返回的结果数量大于等于要查询的最大消息条数,终止循环
```java
if (slotValue <=invalidIndex|| slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
```
如果存储的时间戳小于 0结束查找如果哈希码匹配并且存储时间在要查找的开始时间戳和结束时间戳之间将结果偏移量加入返回结果中
```java
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
```
校验该 index 的上一个 index如果上一个 index 的索引大于 0 并且小于等于 indexCount时间戳大于等于要查找的开始时间戳则继续查找
```java
if (prevIndexRead <=invalidIndex || prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
```

@ -0,0 +1,254 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ MappedFile 内存映射文件详解
1、MappedFile 初始化
```java
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
```
初始化`fileFromOffset`,因为 commitLog 文件夹下的文件都是以偏移量为命名的,所以转成了 long 类型
确认文件目录是否存在,不存在则创建
```java
public static void ensureDirOK(final String dirName) {
if (dirName != null) {
if (dirName.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
String[] dirs = dirName.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String dir : dirs) {
createDirIfNotExist(dir);
}
} else {
createDirIfNotExist(dirName);
}
}
}
```
通过`RandomAccessFile`设置 fileChannel
`this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();`
使用 NIO 内存映射将文件映射到内存中
`this.mappedByteBuffer = this.fileChannel.map(MapMode.*READ_WRITE*, 0, fileSize);`
2、MappedFile 提交
```java
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0();
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
```
如果 wroteBuffer 为空,直接返回 wrotePosition
```java
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
```
判断是否执行 commit 操作:
如果文件已满,返回 true
```java
if (this.isFull()) {
return true;
}
```
```java
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
}
```
commitLeastPages 为本次提交的最小页数,如果 commitLeastPages 大于 0计算当前写指针`wrotePosition`)与上一次提交的指针`committedPosition`的差值 除以页*`OS_PAGE_SIZE`*的大小得到脏页数量,如果大于 commitLeastPages就可以提交。如果 commitLeastPages 小于 0则存在脏页就提交
```java
if (commitLeastPages > 0) {
return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;
```
MapperFile 具体的提交过程,首先创建 `writeBuffer`的共享缓存区,设置 position 为上一次提交的位置`committedPosition` ,设置 limit 为`wrotePosition`当前写指针,接着将 committedPosition 到 wrotePosition 的数据写入到 FileChannel 中,最后更新 committedPosition 指针为 wrotePosition
```java
protected void commit0() {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - lastCommittedPosition > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
```
3、MappedFile 刷盘
判断是否要进行刷盘
文件是否已满
```java
if (this.isFull()) {
return true;
}
```
```java
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
}
```
如果`flushLeastPages`大于 0判断写数据指针位置-上次刷盘的指针位置, 然后除以*`OS_PAGE_SIZE 是否大于等于`*`flushLeastPages`
如果 flushLeastPages 小于等于 0判断是否有要刷盘的数据
```java
if (flushLeastPages > 0) {
return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
```
获取最大读指针
```java
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
```
将数据刷出到磁盘
如果`writeBuffer`不为空或者通道的 position 不等于 0通过 fileChannel 将数据刷新到磁盘
否则通过 MappedByteBuffer 将数据刷新到磁盘
4、MappedFile 销毁
```java
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
```
1> 关闭 MappedFile
第一次调用时 this.`available为true`,设置 available 为 false设置第一次关闭的时间戳为当前时间戳调用 release()释放资源,只有在引用次数小于 1 的时候才会释放资源,如果引用次数大于 0判断当前时间与 firstShutdownTimestamp 的差值是否大于最大拒绝存活期`intervalForcibly`,如果大于等于最大拒绝存活期,将引用数减少 1000直到引用数小于 0 释放资源
```java
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
```
2> 判断是否清理完成
是否清理完成的标准是引用次数小于等于 0 并且清理完成标记 cleanupOver 为 true
```java
public boolean isCleanupOver() {
return this.refCount.get() <= 0 && this.cleanupOver;
}
```
3> 关闭文件通道 fileChannel
`this.fileChannel.close();`

@ -0,0 +1,220 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RockerMQ Nameserver 如何与 Broker 进行通信的?
nameserver 每隔 10s 扫描一次 Broker移除处于未激活状态的 Broker
核心代码:
`this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.*SECONDS*);`
```java
public int scanNotActiveBroker() {
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last +BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(),BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
```
broker 每隔 30 秒会向集群中所有的 NameServer 发送心跳包
核心代码:
```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
```
```java
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
```
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 是网络处理器解析请求类型,如果请求类型为`*RequestCode.REGISTER_BROKER`,则请求最终转发到 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker\*
代码太多,文字来描述一下:
第一步:路由注册需要加写锁,防止并发修改 RouteInfoManager 中的路由表。首先判断 Broker 所属集群是否存在,如果不存在,则创建集群,然后将 broker 名加入集群。
第二步:维护 BrokerData 信息,首先从 brokerAddrTable 中根据 broker 名尝试获取 Broker 信息,如果不存在,则新建 BrokerData 放入 brokerAddrTableregisterFirst 设置为 true如果存在直接替换原先的 Broker 信息registerFirst 设置为 false表示非第一次注册
第三步:如果 Broker 为主节点,并且 Broker 的 topic 配置信息发生变化或者是初次注册,则需要创建或者更新 topic 的路由元数据,并填充 topicQueueTable
根据 topicConfig 创建 QueueData 数据结构然后更新 topicQueueTable
```java
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataMap) {
queueDataMap = new HashMap<>();
queueDataMap.put(queueData.getBrokerName(), queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
QueueData old = queueDataMap.put(queueData.getBrokerName(), queueData);
if (old != null && !old.equals(queueData)) {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), old,
queueData);
}
}
}
```
第四步:更新 BrokerLiveInfo,存储状态正常的 Broker 信息表BrokerLiveInfo 是执行路由删除操作的重要依据。
```java
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
```
第五步:注册 Broker 的过滤器 Server 地址列表,一个 Broker 上会关联多个 FilterServer 消息过滤服务器。如果此 Broker 为从节点,则需要查找该 Broker 的主节点信息,并更新对应的 masterAddr 属性
```java
if (MixAll.MASTER_ID!= brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
```
总结:
NameServer 与 Broker 保持长连接Broker 的状态信息存储在 BrokerLiveTable 中NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 broker 的状态信息以及路由表topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。更新上述路由表使用了锁粒度较少的读写锁允许多个消息发送者并发读操作保证消息发送时的高并发同一时刻 NameServer 只处理一个 Broker 心跳包,多个心跳包请求串行执行。
NameServer 如何剔除失效的 Broker
1、NameServer 每隔十秒注册一次 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp
时间戳距当前时间超过 120 秒,则认为 Broker 失效,移除该 Broker关闭与 broker 的连接,同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
2、如果 broker 在正常关闭的情况下,会发送 unRegisterBroker 指令。
不管是哪一种方式触发的路由删除,处理逻辑是一样的
第一步:申请写锁,移除 brokerLiveTable、filterServerTable 中 Broker 相关的信息
```java
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr);
this.filterServerTable.remove(brokerAddr);
```
第二步:维护 brokerAddrTable找到具体的 broker将其从 brokerData 中移除,如果移除之后不再包含其他 broker则在 brokerAddrtable 移除该 brokerName 对应的数据
```
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddr);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);
removeBrokerName = true;
}
}
```
第三步:根据 brokerName 从 clusterAddrTable 中找到 Broker 并将其中集群中移除,如果移除后集群中不包含任何 Broker则将该集群从 clusterAddrTable 中移除
```
if (removeBrokerName) {
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);
}
}
}
```
第四步: 根据 brokerName遍历所有主题的队列如果队列中包含当前 broker 的队列,则移除,如果 topic 中包含待移除的 Broker 的队列,从路由表中删除该 topic
```
this.topicQueueTable.forEach((topic, queueDataMap) -> {
QueueData old = queueDataMap.remove(brokerName);
if (old != null) {
log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old);
}
if (queueDataMap.size() == 0) {
noBrokerRegisterTopic.add(topic);
log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
}
});
```
第五步:
释放锁,完成路由删除
```
finally {
this.lock.writeLock().unlock();
}
```

@ -0,0 +1,238 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ 生产者启动流程
入口:
org.apache.rocketmq.client.producer.DefaultMQProducer#start
```java
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
```
第一步、检查 producerGroup
```java
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null);
}
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null);
}
}
```
第二步、设置 instanceName
```java
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
```
第三步、创建 mqClientInstance,它是与 nameserver 和 broker 通信的中介
```java
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
```
第四步、将生产者加入 mqClientInstance 管理
```java
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
```
第五步、启动 MQClientInstance有一些关于消费者的任务 会在消费者启动流程中讲解)
1. 启动 netty 客户端 ,创建与 nameserver、broker 通信的 channel
```java
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
}
if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
}
if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
```
2. 启动一些周期性的任务:
更新 nameserver 地址的任务:
```java
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
```
更新 topic 路由信息的任务:
```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
```
更新 broker 的任务:
```java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
```
启动拉取消息线程:
`this.pullMessageService.start();`
```java
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
```

@ -0,0 +1,116 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ broker 处理拉取消息请求流程
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)
第 1 步、`校验broker是否可读`
```java
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
```
第 2 步、`根据消费组获取订阅信息`
```java
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
```
第 3 步、`校验是否允许消费`
```java
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}
```
第 4 步、`根据主题获取对应的配置信息`
```java
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
```
第 5 步、`校验主题对应的队列`
```java
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
```
第 6 步、`如果配置了消息过滤表达式根据表达式进行构建consumerFilterData如果没有则根据主题构建`
```java
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
```
第 7 步、`校验如果不是Tag过滤是否开启了自定义属性过滤如果没有开启不允许操作 只有使用push推送模式的消费者才能用使用SQL92标准的sql语句pull拉取模式的消费者是不支持这个功能的。`
```java
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
```
第 8 步、`根据是否支持重试过滤创建不同的MessageFilter`
```java
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
```
第 9 步、`根据消费组、主题、队列、偏移量、最大拉取消息数量、消息过滤器查找信息`
```java
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
```
第 10 步、`消息为空 设置code为系统错误 返回response`
```java
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
```
第 11 步、`提交偏移量`
```java
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
```

@ -0,0 +1,327 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ 消息拉取流程
之前在消费者启动流程中描述过 MQClientInstance 的启动流程,在启动过程中会启动 PullMessageService它继承了`ServiceThread`,并且 ServiceThread 实现了 Runnable 接口,所以是单独启动了一个线程
`public class PullMessageService extends ServiceThread`
`public abstract class ServiceThread implements Runnable`
PullMessageService 的 run 方法如下:
`protected volatile boolean stopped = false;`
```java
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
```
只要没有停止,线程一直会从 PullRequestQueue 中获取 PullRequest 消息拉取任务,如果队列为空,会一直阻塞,直到有 PullRequest 被放入队列中,如果拿到了 PullRequest 就会调用 pullMessage 方法拉取消息
添加 PullRequest 有两个方法,一个是延迟添加,另一个是立即添加
```java
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
```
org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
拉取消息流程:
根据消费组获取`MQConsumerInner`,根据推模式还是拉模式,强转为`DefaultMQPushConsumerImpl`还是`DefaultLitePullConsumerImpl`
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
`第1步获取处理队列`,`如果队列被丢弃结束`
```java
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
```
第 2 步:`设置最后一次拉取时间戳`
`pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());`
第 3 步:`确认消费者是启动的状态如果不是启动的状态将PullRequest延迟3s放入队列`
```java
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
```
第 4 步:`如果消费者停止了将PullRequest延迟1s放入队列`
```java
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
```
第 5 步:`缓存的消息数量大于1000将PullRequest延迟50ms放入队列,每触发1000次流控输出警告信息`
```java
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
```
第 6 步:`缓存的消息大小大于100M 将PullRequest延迟50ms放入队列每触发1000次输出警告信息`
```java
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
```
第 7 步:`ProcessQueue中消息的最大偏移量与最小偏移量的差值不能大于2000如果大于2000触发流控输出警告信息`
```java
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}
```
第 8 步:`如果ProcessQueue被锁了判断上一个PullRequest是否被锁如果没有被锁通过RebalanceImpl计算拉取消息偏移量如果计算异常将请求延迟3s加入队列``如果下一次拉取消息 的偏移量大于计算出来的偏移量,说明要拉取的偏移量 大于消费偏移量,对 偏移量 进行修正,设置下一次拉取的偏移量为计算出来的偏移量`
```java
if (processQueue.isLocked()) {
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
```
第 9 步:`根据主题名称获取订阅信息如果为空将请求延迟3s放入队列`
```java
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
```
第 10 步:`创建PullCallback为后面调用 拉取消息api做准备`
```java
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
caseFOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
caseNO_NEW_MSG:
caseNO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
caseOFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
```
第 11 步:`设置系统标记`
`FLAG_COMMIT_OFFSET: 消费进度 大于0`
`FLAG_SUSPEND: 拉取消息时支持线程挂起`
`FLAG_SUBSCRIPTION: 消息过滤机制表达式`
`FLAG_CLASS_FILTER: 消息过滤机制是否为类过滤`
```java
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
```
第 12 步:`调用 broker 拉取消息`
```java
// 每一个参数的含义如下
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(), // 要拉取的消息队列
subExpression, // 消息过滤表达式
subscriptionData.getExpressionType(), // 过滤表达式类型
subscriptionData.getSubVersion(), // 时间戳
pullRequest.getNextOffset(), // 消息拉取的开始偏移量
this.defaultMQPushConsumer.getPullBatchSize(), // 拉取消息的数量 默认32条
sysFlag, // 系统标记
commitOffsetValue, // 消费的偏移量
BROKER_SUSPEND_MAX_TIME_MILLIS, // 允许broker挂起的时间 默认15s
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 允许的超时时间 默认30s
CommunicationMode.ASYNC, // 默认为异步拉取
pullCallback // 拉取消息之后的回调
);
```

@ -0,0 +1,365 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ 消息发送流程
这里以同步发送为示例讲解:
入口:
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
消息发送 默认超时时间 3 秒
第一步:验证
主题的长度不能大于 127消息的大小不能大于 4M
```java
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
```
第二步:查找路由信息
如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常
```java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
```
从 NameServer 查询路由信息方法:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
1、如果是默认的主题查询路由信息返回成功更新读队列和写队列的个数为默认的队列个数
```java
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
```
2、返回路由信息之后与本地缓存的路由信息比对判断路由信息是否发生变化如果发生变化更新 broker 地址缓存,更新`topicPublishInfoTable`,更新 topic 路由信息缓存`topicRouteTable`
```java
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
if (!producerTable.isEmpty()) {
TopicPublishInfo publishInfo =topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
```
第三步:选择消息 队列
设置消息发送失败重试次数
`int timesTotal = communicationMode == CommunicationMode.*SYNC* ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;`
`MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);`
首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,`sendWhichQueue`自增然后对队列个数取模获取队列,如果消息发送失败,下一次`sendWhichQueue`仍然自增然后对队列个数取模,可以规避掉上次失败的 broker
```java
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
```
如果启用故障延迟机制:
轮询获取队列 ,如果可用直接返回
```java
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
```
判断是否可用逻辑:先从要规避的 broker 集合`faultItemTable`中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用
```java
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
```
如果没有可用的 broker尝试从 规避的 broker 集合中选择一个可用的 broker如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker
```java
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
```
P.S. :
要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新
```java
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
```
主要更新消息发送故障的延迟时间`currentLatency`和故障规避的 开始时间`startTimestamp`
```java
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
```
总结:
不管开不开启故障延迟机制,都可以规避故障的 broker只是开启故障延迟机制会在一段时间内都不会访问到该 broker而不开启只是下一次不会访问到该 broker
第四步:消息发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
1、为消息分配全局唯一 id
```java
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
```
2、消息体大于 4k 启用压缩
```java
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
```
3、如果是事务消息设置消息类型为事务消息
```java
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
```
4、校验是否超时
```java
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
```
5、组装请求头
```java
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
```
6、发送请求
```java
caseSYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
```
第五步:处理响应结果
1、处理状态码
```java
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT: {
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
break;
}
case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
break;
}
case ResponseCode.SLAVE_NOT_AVAILABLE: {
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
}
case ResponseCode.SUCCESS: {
sendStatus = SendStatus.SEND_OK;
break;
}
default: {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
```
2、构造 SendResult
```java
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
if (traceOn != null && traceOn.equals("false")) {
sendResult.setTraceOn(false);
} else {
sendResult.setTraceOn(true);
}
sendResult.setRegionId(regionId);
```

@ -0,0 +1,193 @@
该文所涉及的 RocketMQ 源码版本为 4.9.3。
# RocketMQ 消息发送存储流程
第一步:检查消息存储状态
org.apache.rocketmq.store.DefaultMessageStore#checkStoreStatus
1、检查 broker 是否可用
```java
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
```
2、检查 broker 的角色
```java
if (BrokerRole.SLAVE== this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("broke role is slave, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
```
3、检查 messageStore 是否可写
```java
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
this.printTimes.set(0);
}
```
4、检查 pageCache
```java
if (this.isOSPageCacheBusy()) {
return PutMessageStatus.OS_PAGECACHE_BUSY;
}
```
第二步:检查消息
org.apache.rocketmq.store.DefaultMessageStore#checkMessage
1、校验主题的长度不能大于 127
```java
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
```
2、校验属性的长度不能大于 32767
```java
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
```
第三步:获取当前可以写入的 CommitLog 文件
CommitLog 文件的存储目录为${ROCKET_HOME}/store/commitlog ,MappedFileQueue 对应此文件夹MappedFile 对应文件夹下的文件
```java
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
```
如果是第一次写入或者最新偏移量所属文件已满,创建新的文件
```java
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
```
第四步:将消息写入到 MappedFile 中
```java
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
```
org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.CommitLog.PutMessageContext)
计算要写入的偏移量
`long wroteOffset = fileFromOffset + byteBuffer.position();`
对事务消息做特殊处理:
```java
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
```
构造 AppendMessageResult
```java
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
```
事务消息特殊处理:
```java
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
break;
default:
break;
}
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 61 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

@ -6,11 +6,11 @@
<title></title>
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1" />
<meta name="keywords" content="doc,docs,doocs,documentation,github,gitee,source-code-hunter,AmyliaY" />
<meta name="description" content="读尽天下源码,心中自然无码,《源码猎人》项目维护者:云之君" />
<meta name="description" content="读尽天下源码,心中自然无码——源码猎人" />
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0" />
<link rel="stylesheet" href="//cdn.jsdelivr.net/npm/docsify/lib/themes/vue.css" />
<link rel="stylesheet" href="//cdn.jsdelivr.net/npm/docsify-darklight-theme@latest/dist/style.min.css">
<link rel="stylesheet" href="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify/lib/themes/vue.css" />
<link rel="stylesheet" href="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify-darklight-theme@latest/dist/style.min.css">
<link rel="icon" type="image/png" sizes="32x32" href="images/favicon-32x32.png" />
<link rel="icon" type="image/png" sizes="16x16" href="images/favicon-16x16.png" />
<style>
@ -70,6 +70,14 @@
highlightColor: '#e96900',
}
},
contributors: {
repo: 'doocs/source-code-hunter',
ignores: ['/README.md'],
image: {
margin: '0.2em',
isRound: true
}
},
plugins: [
function (hook, vm) {
hook.beforeEach(function (content) {
@ -122,29 +130,34 @@
});
hook.afterEach(function (html) {
const footer = [
"<footer>",
'<span>Copyright © 2018-2021 <a href="https://github.com/doocs" target="_blank">Doocs</a>. All rights reserved.',
"</footer>",
].join("");
const currentYear = new Date().getFullYear()
const footer = `<footer><span>Copyright © 2018-${currentYear} <a href="https://github.com/doocs" target="_blank">Doocs</a>. All Rights Reserved.</footer>`
return html + footer;
});
},
],
};
</script>
<script src="//cdn.jsdelivr.net/npm/docsify/lib/docsify.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/prismjs/components/prism-c.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/prismjs/components/prism-bash.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/prismjs/components/prism-cpp.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/prismjs/components/prism-json.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/prismjs/components/prism-java.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/prismjs/components/prism-python.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/docsify-copy-code@2.1.1/dist/docsify-copy-code.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/docsify/lib/plugins/search.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/docsify/lib/plugins/emoji.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/docsify/lib/plugins/zoom-image.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/docsify-darklight-theme@latest/dist/index.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify/lib/docsify.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/prismjs/components/prism-c.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/prismjs/components/prism-bash.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/prismjs/components/prism-cpp.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/prismjs/components/prism-json.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/prismjs/components/prism-java.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/prismjs/components/prism-python.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify-copy-code@2.1.1/dist/docsify-copy-code.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify/lib/plugins/search.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify/lib/plugins/zoom-image.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify-darklight-theme@latest/dist/index.min.js"></script>
<script src="https://cdn-doocs.oss-cn-shenzhen.aliyuncs.com/npm/docsify-contributors@latest/dist/index.min.js"></script>
</body>
<!--
cdn.jsdelivr.net
test1.jsdelivr.net
testingcf.jsdelivr.net
fastly.jsdelivr.net
gcore.jsdelivr.net
-->
</html>
Loading…
Cancel
Save