feat: update HashedWheelTimer时间轮原理分析.md

pull/75/head
yanglbme 5 years ago
parent d4229af363
commit 79217b535f

@ -1,8 +1,10 @@
该文所涉及的 netty 源码版本为 4.1.6。
该文所涉及的 Netty 源码版本为 4.1.6。
## Netty 时间轮 HashedWheelTimer 是什么
## HashedWheelTimer 是什么
Netty 的时间轮 HashedWheelTimer 给出了一个粗略的定时器实现,之所以称之为粗略的实现是因为该时间轮并没有严格的准时执行定时任务,而是在每隔一个时间间隔之后的时间节点执行,并执行当前时间节点之前到期的定时任务。当然具体的定时任务的时间执行精度可以通过调节 HashedWheelTimer 构造方法的时间间隔的大小来进行调节,在大多数网络应用的情况下,由于 IO 延迟的存在,并不会严格要求具体的时间执行精度,所以默认的 100ms 时间间隔可以满足大多数的情况,不需要再花精力去调节该时间精度。
Netty 的时间轮 `HashedWheelTimer` 给出了一个**粗略的定时器实现**,之所以称之为粗略的实现是**因为该时间轮并没有严格的准时执行定时任务**,而是在每隔一个时间间隔之后的时间节点执行,并执行当前时间节点之前到期的定时任务。
当然具体的定时任务的时间执行精度可以通过调节 HashedWheelTimer 构造方法的时间间隔的大小来进行调节,在大多数网络应用的情况下,由于 IO 延迟的存在,并**不会严格要求具体的时间执行精度**,所以默认的 100ms 时间间隔可以满足大多数的情况,不需要再花精力去调节该时间精度。
## HashedWheelTimer 的实现原理
@ -12,14 +14,14 @@ Netty 的时间轮 HashedWheelTimer 给出了一个粗略的定时器实现,
private final HashedWheelBucket[] wheel;
```
HashedWheelTimer 的主体数据结构 wheel 是一个由多个链表所组成的数组,默认情况下该数组的大小为 512。当定时任务准备加入到时间轮中的时候将会以其等待执行的时间为依据选择该数组上的一个具体槽位上的链表加入。
HashedWheelTimer 的主体数据结构 wheel 是一个**由多个链表所组成的数组**,默认情况下该数组的大小为 512。当定时任务准备加入到时间轮中的时候将会以其等待执行的时间为依据选择该数组上的一个具体槽位上的链表加入。
```java
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
```
在这个 wheel 数组中,每一个槽位都是一条由 HashedWheelTimeout 所组成的链表,其中链表中的每一个节点都是一个等待执行的定时任务。
在这个 wheel 数组中,每一个槽位都是一条由 HashedWheelTimeout 所组成的**链表**,其中链表中的**每一个节点都是一个等待执行的定时任务**
### HashedWheelTimer 内部的线程模型
@ -63,9 +65,9 @@ public void run() {
}
```
简单看到 HashedWheelTimer 内部的 woker 线程的 run()方法,在其首先会记录启动时间作为 startTime 作为接下来调度定时任务的时间依据,而之后会通过 CountDownLatch 来通知所有外部线程当前 worker 工作线程已经初始化完毕。之后的循环体便是当时间轮持续生效的时间里的具体调度逻辑。时间刻度是时间轮的一个重要属性,其默认为 100ms此处的循环间隔便是时间轮的时间刻度默认情况下就是间隔 100ms 进行一次调度循环。工作线程会维护当前工作线程具体循环了多少轮,用于定位具体执行触发时间轮数组上的哪一个位置上的链表。当时间轮准备 shutdown 的阶段,最后的代码会对未执行的任务整理到未执行的队列中。
简单看到 HashedWheelTimer 内部的 woker 线程的 `run()`方法,在其首先会记录启动时间作为 startTime 作为接下来调度定时任务的时间依据,而之后会通过 CountDownLatch 来通知所有外部线程当前 worker 工作线程已经初始化完毕。之后的循环体便是当时间轮持续生效的时间里的具体调度逻辑。**时间刻度是时间轮的一个重要属性**,其默认为 100ms此处的循环间隔便是时间轮的时间刻度默认情况下就是间隔 100ms 进行一次调度循环。工作线程会维护当前工作线程具体循环了多少轮,用于定位具体执行触发时间轮数组上的哪一个位置上的链表。当时间轮准备 shutdown 的阶段,最后的代码会对未执行的任务整理到未执行的队列中。
由此可见worker 线程的 run()方法中基本定义了工作线程的整个生命周期,从初始的初始化到循环体中的具体调度,最后到未执行任务的具体清理。整体的调度逻辑便主要在这里执行。值得注意的是,在这里的前提下,每个 HashedWheelTimer 时间轮都会有一个工作线程进行调度,所以不需要在 netty 中在每一个连接中单独使用一个 HashedWheelTimer 来进行定时任务的调度,否则可能将对性能产生影响。
由此可见,**worker 线程的 run()方法中基本定义了工作线程的整个生命周期,从初始的初始化到循环体中的具体调度,最后到未执行任务的具体清理**。整体的调度逻辑便主要在这里执行。值得注意的是,在这里的前提下,每个 HashedWheelTimer 时间轮都会有一个工作线程进行调度,所以不需要在 netty 中在每一个连接中单独使用一个 HashedWheelTimer 来进行定时任务的调度,否则可能将对性能产生影响。
### 向 HashedWheelTimer 加入一个定时任务的流程
@ -89,7 +91,7 @@ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
}
```
当此次是首次向该时间轮加入定时任务的时候,将会通过 start()方法开始执行上文所述的 worker 工作线程的启动与循环调度逻辑,这里暂且不提。之后计算定时任务触发时间相对于时间轮初始化时间的相对时间间隔 deadline并将其包装为一个链表节点 HashedWheelTimeout ,投入到 timeouts 队列中,等待 worker 工作线程在下一轮调度循环中将其加入到时间轮的具体链表中等待触发执行timeouts 的实现是一个 mpsc 队列,关于 mpsc 队列可以查看[此文](https://github.com/doocs/source-code-hunter/blob/master/docs/Netty/Netty%E6%8A%80%E6%9C%AF%E7%BB%86%E8%8A%82%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/MpscLinkedQueue%E9%98%9F%E5%88%97%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90.md),这里也符合多生产者单消费者的队列模型
当此次是首次向该时间轮加入定时任务的时候,将会通过 start()方法开始执行上文所述的 worker 工作线程的启动与循环调度逻辑,这里暂且不提。之后计算定时任务触发时间相对于时间轮初始化时间的相对时间间隔 deadline并将其包装为一个链表节点 HashedWheelTimeout ,投入到 timeouts 队列中,等待 worker 工作线程在下一轮调度循环中将其加入到时间轮的具体链表中等待触发执行timeouts 的实现是一个 mpsc 队列,关于 mpsc 队列可以查看[此文](https://mp.weixin.qq.com/s/VVoDJwrLZrN3mm-jaQJayQ),这里也符合**多生产者单消费者的队列模型**
### HashedWheelTimer 中工作线程的具体调度
@ -108,9 +110,9 @@ do {
在 HashedWheelTimer 中的工作线程 run()方法的主要循环中,主要分为三个步骤。
首先 worker 线程会通过 waitForNextTick()方法根据时间轮的时间刻度等待一轮循环的开始,在默认情况下时间轮的时间刻度是 100ms那么此处 worker 线程也将在这个方法中 sleep 相应的时间等待下一轮循环的开始。此处也决定了时间轮的定时任务时间精度。
首先 worker 线程会通过 `waitForNextTick()`方法根据时间轮的时间刻度等待一轮循环的开始,在默认情况下时间轮的时间刻度是 100ms那么此处 worker 线程也将在这个方法中 sleep 相应的时间等待下一轮循环的开始。此处也决定了时间轮的定时任务时间精度。
当 worker 线程经过相应时间间隔的 sleep 之后,也代表新的一轮调度开始。此时,会通过 transferTimeoutsToBuckets()方法将之前刚刚加入到 timeouts 队列中的定时任务放入到时间轮具体槽位上的链表中。
当 worker 线程经过相应时间间隔的 sleep 之后,也代表新的一轮调度开始。此时,会通过 `transferTimeoutsToBuckets()`方法将之前刚刚加入到 timeouts 队列中的定时任务放入到时间轮具体槽位上的链表中。
```java
for (int i = 0; i < 100000; i++) {
@ -136,6 +138,13 @@ for (int i = 0; i < 100000; i++) {
}
```
首先,在每一轮的调度中,最多只会从 timeouts 队列中定位到时间轮 100000 个定时任务,这也是为了防止在这里耗时过久导致后面触发定时任务的延迟。在这里会不断从 timeouts 队列中获取刚加入的定时任务。具体的计算流程便是将定时任务相对于时间轮初始化时间的相对间隔与时间轮的时间刻度相除得到相对于初始化时间的具体轮数,之后便在减去当前轮数得到还需要遍历几遍整个时间轮数组得到 remainingRounds最后将轮数与时间轮数组长度-1 相与,得到该定时任务到底应该存放到时间轮上哪个位置的链表。用具体的数组举个例子,该时间轮初始化时间为 12 点,时间刻度为 1 小时,时间轮数组长度为 8当前时间 13 点,当向时间轮加入一个明天 13 点执行的任务的时候,首先得到该任务相对于初始化的时间间隔是 25 小时,也就是需要 25 轮调度,而当前 13 点,当前调度轮数为 1因此还需要 24 轮调度,就需要再遍历 3 轮时间轮,因此 remainingRounds 为 3再根据 25 与 8-1 相与的结果为 1因此将该定时任务放置到时间轮数组下标为 1 的链表上等待被触发。这便是一次完整的定时任务加入到时间轮具体位置的计算。
首先,在每一轮的调度中,最多只会从 `timeouts` 队列中定位到时间轮 100000 个定时任务,这也是为了防止在这里耗时过久导致后面触发定时任务的延迟。在这里会不断从 timeouts 队列中获取刚加入的定时任务。
**具体的计算流程**便是将定时任务相对于时间轮初始化时间的相对间隔与时间轮的时间刻度相除得到相对于初始化时间的具体轮数,之后便在减去当前轮数得到还需要遍历几遍整个时间轮数组得到 remainingRounds最后将轮数与时间轮数组长度-1 相与,得到该定时任务到底应该存放到时间轮上哪个位置的链表。
用具体的数组**举个例子**,该时间轮初始化时间为 12 点,时间刻度为 1 小时,时间轮数组长度为 8当前时间 13 点,当向时间轮加入一个明天 13 点执行的任务的时候,首先得到该任务相对于初始化的时间间隔是 25 小时,也就是需要 25 轮调度,而当前 13 点,当前调度轮数为 1因此还需要 24 轮调度,就需要再遍历 3 轮时间轮,因此 remainingRounds 为 3再根据 25 与 8-1 相与的结果为 1因此将该定时任务放置到时间轮数组下标为 1 的链表上等待被触发。
这便是**一次完整的定时任务加入到时间轮具体位置的计算**。
在 worker 线程的最后,就需要来具体执行定时任务了,首先通过当前循环轮数与时间轮数组长度-1 相与的结果定位具体触发时间轮数组上哪个位置上的链表,再通过 `expireTimeouts()`方法依次对链表上的定时任务进行触发执行。这里的流程就相对很简单,链表上的节点如果 remainingRounds 小于等于 0那么就可以直接执行这个定时任务如果 remainingRounds 大于 0那么显然还没有到达触发的时间点则将其-1 等待下一轮的调度之后再进行执行。在继续回到上面的例子,当 14 点来临之时,此时工作线程将进行第 2 轮的调度,将会把 2 与 8-1 进行相与得到结果 2那么当前工作线程就会选择时间轮数组下标为 2 的链表依次判断是否需要触发,如果 remainingRounds 为 0 将会直接触发,否则将会将 remainingRounds-1 等待下一轮的执行。
在 worker 线程的最后,就需要来具体执行定时任务了,首先通过当前循环轮数与时间轮数组长度-1 相与的结果定位具体触发时间轮数组上哪个位置上的链表,再通过 expireTimeouts()方法依次对链表上的定时任务进行触发执行。这里的流程就相对很简单,链表上的节点如果 remainingRounds 小于等于 0那么就可以直接执行这个定时任务如果 remainingRounds 大于 0那么显然还没有到达触发的时间点则将其-1 等待下一轮的调度之后再进行执行。在继续回到上面的例子,当 14 点来临之时,此时工作线程将进行第 2 轮的调度,将会把 2 与 8-1 进行相与得到结果 2那么当前工作线程就会选择时间轮数组下标为 2 的链表依次判断是否需要触发,如果 remainingRounds 为 0 将会直接触发,否则将会将 remainingRounds-1 等待下一轮的执行。

@ -1,456 +1,109 @@
# SpringBoot 日志系统
该文所涉及的 netty 源码版本为 4.1.6。
- Author: [HuiFer](https://github.com/huifer)
- 源码阅读仓库: [SourceHot-spring-boot](https://github.com/SourceHot/spring-boot-read)
## Netty 中的 ByteBuf 为什么会发生内存泄漏
- 包路径: `org.springframework.boot.logging`
## 日志级别
- 日志级别: `org.springframework.boot.logging.LogLevel`
```java
public enum LogLevel {
TRACE, DEBUG, INFO, WARN, ERROR, FATAL, OFF
}
```
## java 日志实现
- `org.springframework.boot.logging.java.JavaLoggingSystem`
![image-20200323144523848](../../images/SpringBoot/image-20200323144523848.png)
在 Netty 中ByetBuf 并不是只采用可达性分析来对 ByteBuf 底层的 byte[]数组来进行垃圾回收,而同时采用引用计数法来进行回收,来保证堆外内存的准确时机的释放。
在每个 ByteBuf 中都维护着一个 refCnt 用来对 ByteBuf 的被引用数进行记录,当 ByteBuf 的 retain()方法被调用时,将会增加 refCnt 的计数,而其 release()方法被调用时将会减少其被引用数计数。
```java
static {
// KEY : springBoot 定义的日志级别, value: jdk 定义的日志级别
LEVELS.map(LogLevel.TRACE, Level.FINEST);
LEVELS.map(LogLevel.DEBUG, Level.FINE);
LEVELS.map(LogLevel.INFO, Level.INFO);
LEVELS.map(LogLevel.WARN, Level.WARNING);
LEVELS.map(LogLevel.ERROR, Level.SEVERE);
LEVELS.map(LogLevel.FATAL, Level.SEVERE);
LEVELS.map(LogLevel.OFF, Level.OFF);
}
```
- LEVELS 对象
```java
protected static class LogLevels<T> {
/**
* key SpringBoot 中定义的日志级别, value: 其他日志框架的日志级别
*/
private final Map<LogLevel, T> systemToNative;
/**
* key : 其他日志框架的日志级别 , value: springBoot 中定义中定义的日志级别
*/
private final Map<T, LogLevel> nativeToSystem;
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
```
## LoggingSystem
- 抽象类
- `org.springframework.boot.logging.LoggingSystem`
- 一个 map 对象: `SYSTEMS`
```java
/**
* key: 第三方日志框架的类 value: springBoot 中的处理类
*/
private static final Map<String, String> SYSTEMS;
static {
Map<String, String> systems = new LinkedHashMap<>();
systems.put("ch.qos.logback.core.Appender", "org.springframework.boot.logging.logback.LogbackLoggingSystem");
systems.put("org.apache.logging.log4j.core.impl.Log4jContextFactory",
"org.springframework.boot.logging.log4j2.Log4J2LoggingSystem");
systems.put("java.util.logging.LogManager", "org.springframework.boot.logging.java.JavaLoggingSystem");
SYSTEMS = Collections.unmodifiableMap(systems);
}
```
- 各个抽象方法
| 方法名称 | 作用 |
| ----------------------- | ---------------------------------- |
| beforeInitialize | 初始化之前调用,目的是减少日志输出 |
| initialize | 初始化日志 |
| cleanUp | 清除日志 |
| getShutdownHandler | |
| getSupportedLogLevels | 获取支持的日志级别 |
| setLogLevel | 设置日志级别 |
| getLoggerConfigurations | 获取日志配置 |
### get
```java
public static LoggingSystem get(ClassLoader classLoader) {
// 获取系统属性
String loggingSystem = System.getProperty(SYSTEM_PROPERTY);
if (StringUtils.hasLength(loggingSystem)) {
// 是不是NONE
if (NONE.equals(loggingSystem)) {
// 空的日志系统
return new NoOpLoggingSystem();
}
return get(classLoader, loggingSystem);
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
// 循环所有日志,
return SYSTEMS.entrySet().stream().filter((entry) -> ClassUtils.isPresent(entry.getKey(), classLoader))
.map((entry) ->
// 实例化具体日志
get(classLoader, entry.getValue())).findFirst()
.orElseThrow(() -> new IllegalStateException("No suitable logging system located"));
}
```
- 实例化日志系统
```java
private static LoggingSystem get(ClassLoader classLoader, String loggingSystemClass) {
try {
Class<?> systemClass = ClassUtils.forName(loggingSystemClass, classLoader);
Constructor<?> constructor = systemClass.getDeclaredConstructor(ClassLoader.class);
constructor.setAccessible(true);
return (LoggingSystem) constructor.newInstance(classLoader);
return false;
}
catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
```
![image-20200323151409473](../../images/SpringBoot/image-20200323151409473.png)
- 默认日志: `org.springframework.boot.logging.logback.LogbackLoggingSystem`
### beforeInitialize
- 初始化之前
![image-20200323154205484](../../images/SpringBoot/image-20200323154205484.png)
- 链路
1. `org.springframework.boot.context.logging.LoggingApplicationListener#onApplicationEvent`
2. `org.springframework.boot.context.logging.LoggingApplicationListener#onApplicationStartingEvent`
3. `org.springframework.boot.logging.LoggingSystem#beforeInitialize`
- 因为前文中我们已知对象是:`org.springframework.boot.logging.logback.LogbackLoggingSystem` 直接看这个类的**`beforeInitialize`**方法
```java
@Override
public void beforeInitialize() {
// 日志上下文
LoggerContext loggerContext = getLoggerContext();
// 是否初始化
if (isAlreadyInitialized(loggerContext)) {
return;
}
// 父类方法
super.beforeInitialize();
// 添加过滤器
loggerContext.getTurboFilterList().add(FILTER);
}
```
- 初始化之前的的操作完成了初始化方法开始
### initialize
- `org.springframework.boot.context.logging.LoggingApplicationListener#onApplicationEnvironmentPreparedEvent`
```java
private void onApplicationEnvironmentPreparedEvent(ApplicationEnvironmentPreparedEvent event) {
if (this.loggingSystem == null) {
this.loggingSystem = LoggingSystem.get(event.getSpringApplication().getClassLoader());
}
initialize(event.getEnvironment(), event.getSpringApplication().getClassLoader());
}
```
- `org.springframework.boot.context.logging.LoggingApplicationListener#initializeSystem`
```java
protected void initialize(ConfigurableEnvironment environment, ClassLoader classLoader) {
new LoggingSystemProperties(environment).apply();
this.logFile = LogFile.get(environment);
if (this.logFile != null) {
this.logFile.applyToSystemProperties();
}
this.loggerGroups = new LoggerGroups(DEFAULT_GROUP_LOGGERS);
// 早期 的日志级别
initializeEarlyLoggingLevel(environment);
// 初始化日志系统
initializeSystem(environment, this.loggingSystem, this.logFile);
// 初始化日志级别
initializeFinalLoggingLevels(environment, this.loggingSystem);
registerShutdownHookIfNecessary(environment, this.loggingSystem);
}
```
```java
private void initializeSystem(ConfigurableEnvironment environment, LoggingSystem system, LogFile logFile) {
LoggingInitializationContext initializationContext = new LoggingInitializationContext(environment);
String logConfig = environment.getProperty(CONFIG_PROPERTY);
if (ignoreLogConfig(logConfig)) {
// 日志系统初始化
system.initialize(initializationContext, null, logFile);
}
else {
try {
ResourceUtils.getURL(logConfig).openStream().close();
system.initialize(initializationContext, logConfig, logFile);
}
catch (Exception ex) {
// NOTE: We can't use the logger here to report the problem
System.err.println("Logging system failed to initialize using configuration from '" + logConfig + "'");
ex.printStackTrace(System.err);
throw new IllegalStateException(ex);
}
}
}
```
- `org.springframework.boot.logging.logback.LogbackLoggingSystem#initialize`
当调用了 ByteBuf 的 release()方法的时候,最后在上方的 release0()方法中将会为 ByteBuf 的引用计数减一,当引用计数归于 0 的时候,将会调用 deallocate()方法对其对应的底层存储数组进行释放(在池化的 ByteBuf 中,在 deallocate()方法里会把该 ByteBuf 的 byte[]回收到底层内存池中,以确保 byte[]可以重复利用)。
由于 Netty 中的 ByteBuf 并不是随着申请之后会马上使其引用计数归 0 而进行释放,往往在这两个操作之间还有许多操作,如果在这其中如果发生异常抛出导致引用没有及时释放,在使用池化 ByetBuffer 的情况下内存泄漏的问题就会产生。
当采用了池化的 ByteBuffer 的时候,比如 PooledHeapByteBuf 和 PooledDirectByteBuf其 deallocate()方法一共主要分为两个步骤。
```java
@Override
public void initialize(LoggingInitializationContext initializationContext, String configLocation, LogFile logFile) {
LoggerContext loggerContext = getLoggerContext();
if (isAlreadyInitialized(loggerContext)) {
return;
}
// 日志初始化
super.initialize(initializationContext, configLocation, logFile);
loggerContext.getTurboFilterList().remove(FILTER);
markAsInitialized(loggerContext);
if (StringUtils.hasText(System.getProperty(CONFIGURATION_FILE_PROPERTY))) {
getLogger(LogbackLoggingSystem.class.getName()).warn("Ignoring '" + CONFIGURATION_FILE_PROPERTY
+ "' system property. Please use 'logging.config' instead.");
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.arena.free(chunk, handle, maxLength);
recycle();
}
}
}
```
- `org.springframework.boot.logging.AbstractLoggingSystem#initializeWithConventions`
```java
private void initializeWithConventions(LoggingInitializationContext initializationContext, LogFile logFile) {
String config = getSelfInitializationConfig();
if (config != null && logFile == null) {
// self initialization has occurred, reinitialize in case of property changes
reinitialize(initializationContext);
return;
}
if (config == null) {
config = getSpringInitializationConfig();
}
if (config != null) {
loadConfiguration(initializationContext, config, logFile);
return;
}
// 加载默认配置
loadDefaults(initializationContext, logFile);
}
```
- `org.springframework.boot.logging.logback.LogbackLoggingSystem#loadDefaults`
```java
@Override
protected void loadDefaults(LoggingInitializationContext initializationContext, LogFile logFile) {
LoggerContext context = getLoggerContext();
stopAndReset(context);
boolean debug = Boolean.getBoolean("logback.debug");
if (debug) {
StatusListenerConfigHelper.addOnConsoleListenerInstance(context, new OnConsoleStatusListener());
}
LogbackConfigurator configurator = debug ? new DebugLogbackConfigurator(context)
: new LogbackConfigurator(context);
Environment environment = initializationContext.getEnvironment();
context.putProperty(LoggingSystemProperties.LOG_LEVEL_PATTERN,
environment.resolvePlaceholders("${logging.pattern.level:${LOG_LEVEL_PATTERN:%5p}}"));
context.putProperty(LoggingSystemProperties.LOG_DATEFORMAT_PATTERN, environment.resolvePlaceholders(
"${logging.pattern.dateformat:${LOG_DATEFORMAT_PATTERN:yyyy-MM-dd HH:mm:ss.SSS}}"));
context.putProperty(LoggingSystemProperties.ROLLING_FILE_NAME_PATTERN, environment
.resolvePlaceholders("${logging.pattern.rolling-file-name:${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz}"));
new DefaultLogbackConfiguration(initializationContext, logFile).apply(configurator);
context.setPackagingDataEnabled(true);
}
- 将其底层的 byte[]通过 free()方法回收到内存池中等待下一次使用。
- 通过 recycle()方法将其本身回收到对象池中等待下一次使用。
关键在第一步的内存回收到池中,如果其引用计数未能在 ByteBuf 对象被回收之前归 0将会导致其底层占用 byte[]无法回收到内存池 PoolArena 中,导致该部分无法被重复利用,下一次将会申请新的内存进行操作,从而产生内存泄漏。
而非池化的 ByteBuffer 即使引用计数没有在对象被回收的时候被归 0因为其使用的是单独一块 byte[]内存,因此也会随着 java 对象被回收使得底层 byte[]被释放(由 JDK 的 Cleaner 来保证)。
```
## Netty 进行内存泄漏检测的原理
```java
@Override
public void initialize(LoggingInitializationContext initializationContext, String configLocation, LogFile logFile) {
LoggerContext loggerContext = getLoggerContext();
// 是否加载过
if (isAlreadyInitialized(loggerContext)) {
return;
}
// 日志初始化
super.initialize(initializationContext, configLocation, logFile);
// 删除 FILTER
loggerContext.getTurboFilterList().remove(FILTER);
// 初始化标记
markAsInitialized(loggerContext);
if (StringUtils.hasText(System.getProperty(CONFIGURATION_FILE_PROPERTY))) {
getLogger(LogbackLoggingSystem.class.getName()).warn("Ignoring '" + CONFIGURATION_FILE_PROPERTY
+ "' system property. Please use 'logging.config' instead.");
}
}
```
标记`markAsInitialized`
在 Netty 对于 ByteBuf 的检测中,一共包含 4 个级别。
```java
private void markAsInitialized(LoggerContext loggerContext) {
loggerContext.putObject(LoggingSystem.class.getName(), new Object());
if (level.ordinal() < Level.PARANOID.ordinal()) {
if (leakCheckCnt ++ % samplingInterval == 0) {
reportLeak(level);
return new DefaultResourceLeak(obj);
} else {
return null;
}
}
```
此时日志初始化完成
### 默认配置文件
- `getStandardConfigLocations` 这个方法定义了默认配置文件有哪些
以默认的 SIMPLE 级别为例在这个级别下Netty 将会根据以 ByteBuf 创建的序列号与 113 进行取模来判断是否需要进行内存泄漏的检测追踪。当取模成功的时候,将会为这个 ByteBuf 产生一个对应的 DefaultResourceLeak 对象DefaultResourceLeak 是一个 PhantomReference 虚引用的子类,并有其对应的 ReferenceQueue。之后通过 SimpleLeakAwareByteBuf 类来将被追踪的 ByteBuf 和 DefaultResourceLeak 包装起来。
```java
@Override
protected String[] getStandardConfigLocations() {
return new String[] { "logback-test.groovy", "logback-test.xml", "logback.groovy", "logback.xml" };
@Override
public boolean release(int decrement) {
boolean deallocated = super.release(decrement);
if (deallocated) {
leak.close();
}
```
- 切回`org.springframework.boot.logging.AbstractLoggingSystem#initializeWithConventions`方法
- 添加依赖
```XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<version>${revision}</version>
</dependency>
```
- 添加配置文件
![image-20200323161442058](../../images/SpringBoot/image-20200323161442058.png)
![image-20200323161522570](../../images/SpringBoot/image-20200323161522570.png)
- 此时配置文件地址出现了
```java
protected String getSelfInitializationConfig() {
// 寻找配置文件
return findConfig(getStandardConfigLocations());
}
return deallocated;
}
```
```java
@Override
protected String[] getStandardConfigLocations() {
return new String[] { "logback-test.groovy", "logback-test.xml", "logback.groovy", "logback.xml" };
}
```
在包装类中,如果该 ByteBuf 成功 deallocated 释放掉了其持有的 byte[]数组将会调用 DefaultResourceLeak 的 close()方法来已通知当前 ByteBuf 已经释放了其持有的内存。
正是这个虚引用使得该 DefaultResourceLeak 对象被回收的时候将会被放入到与这个虚引用所对应的 ReferenceQueue 中。
```java
private String findConfig(String[] locations) {
for (String location : locations) {
ClassPathResource resource = new ClassPathResource(location, this.classLoader);
if (resource.exists()) {
return "classpath:" + location;
}
}
return null;
}
```
- 此时自定义配置文件如何获取的已经明了
#### reinitialize
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
if (ref == null) {
break;
}
```java
@Override
protected void reinitialize(LoggingInitializationContext initializationContext) {
// 日志上下文重新设置
getLoggerContext().reset();
getLoggerContext().getStatusManager().clear();
// 加载配置文件
loadConfiguration(initializationContext, getSelfInitializationConfig(), null);
}
ref.clear();
```
if (!ref.close()) {
continue;
}
```java
@Override
protected void loadConfiguration(LoggingInitializationContext initializationContext, String location,
LogFile logFile) {
// 父类方法
super.loadConfiguration(initializationContext, location, logFile);
// 获取上下文
LoggerContext loggerContext = getLoggerContext();
// 停止并且重启
stopAndReset(loggerContext);
try {
// 配置文件加载
configureByResourceUrl(initializationContext, loggerContext, ResourceUtils.getURL(location));
}
catch (Exception ex) {
throw new IllegalStateException("Could not initialize Logback logging from " + location, ex);
}
List<Status> statuses = loggerContext.getStatusManager().getCopyOfStatusList();
StringBuilder errors = new StringBuilder();
for (Status status : statuses) {
if (status.getLevel() == Status.ERROR) {
errors.append((errors.length() > 0) ? String.format("%n") : "");
errors.append(status.toString());
}
}
if (errors.length() > 0) {
throw new IllegalStateException(String.format("Logback configuration error detected: %n%s", errors));
}
String records = ref.toString();
if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
if (records.isEmpty()) {
logger.error("LEAK: {}.release() was not called before it's garbage-collected. " +
"Enable advanced leak reporting to find out where the leak occurred. " +
"To enable advanced leak reporting, " +
"specify the JVM option '-D{}={}' or call {}.setLevel()",
resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this));
} else {
logger.error(
"LEAK: {}.release() was not called before it's garbage-collected.{}",
resourceType, records);
}
}
```
```java
private void configureByResourceUrl(LoggingInitializationContext initializationContext, LoggerContext loggerContext,
URL url) throws JoranException {
if (url.toString().endsWith("xml")) {
// logback 日志操作
JoranConfigurator configurator = new SpringBootJoranConfigurator(initializationContext);
// 设置上下文
configurator.setContext(loggerContext);
// 执行配置
configurator.doConfigure(url);
}
else {
new ContextInitializer(loggerContext).configureByResource(url);
}
}
```
Netty 会在下一次 ByteBuf 的采样中通过 reportLeak()方法将 ReferenceQueue 中的 DefaultResourceLeak 取出并判断其对应的 ByteBuf 是否已经在其回收前调用过其 close()方法,如果没有,显然在池化 ByteBuf 的场景下内存泄漏已经产生,将会以 ERROR 日志的方式进行日志打印。
- 执行配置属于 logback 操作源码不在此进行分析
以上内容可以结合 JVM 堆外内存的资料进行阅读。

Loading…
Cancel
Save