pull/1/head
ruyu.li 3 years ago
parent 7759df15de
commit e1c2e1953d

@ -108,6 +108,431 @@ ReentrantLock、ReentrantReadWriteLock 都是基于 AbstractQueuedSynchronizer (
## AQS框架
![AQS-简化流程图](images/JAVA/AQS-简化流程图.png)
### 基础
AbstractQueuedSynchronizer抽象同步队列简称AQS它是实现同步器的基础组件如常用的ReentrantLock、Semaphore、CountDownLatch等。
AQS定义了一套多线程访问共享资源的同步模板解决了实现同步器时涉及的大量细节问题能够极大地减少实现工作虽然大多数开发者可能永远不会使用AQS实现自己的同步器JUC包下提供的同步器基本足够应对日常开发但是知道AQS的原理对于架构设计还是很有帮助的面试还可以吹吹牛下面是AQS的组成结构。
![AQS的组成结构](images/JAVA/AQS的组成结构.png)
三部分组成:`state同步状态`、`Node组成的CLH队列`、`ConditionObject条件变量`包含Node组成的条件单向队列
**状态**
- `getState()`:返回同步状态
- `setState(int newState)`:设置同步状态
- `compareAndSetState(int expect, int update)`使用CAS设置同步状态
- `isHeldExclusively()`:当前线程是否持有资源
**独占资源(不响应线程中断)**
- `tryAcquire(int arg)`:独占式获取资源,子类实现
- `acquire(int arg)`:独占式获取资源模板
- `tryRelease(int arg)`:独占式释放资源,子类实现
- `release(int arg)`:独占式释放资源模板
**共享资源(不响应线程中断)**
- `tryAcquireShared(int arg)`共享式获取资源返回值大于等于0则表示获取成功否则获取失败子类实现
- `acquireShared(int arg)`:共享形获取资源模板
- `tryReleaseShared(int arg)`:共享式释放资源,子类实现
- `releaseShared(int arg)`:共享式释放资源模板
### 同步状态
在AQS中维护了一个同步状态变量stategetState函数获取同步状态setState、compareAndSetState函数修改同步状态对于AQS来说线程同步的关键是对state的操作可以说获取、释放资源是否成功都是由state决定的比如state>0代表可获取资源否则无法获取所以state的具体语义由实现者去定义现有的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样。
- ReentrantLock的state用来表示是否有锁资源
- ReentrantReadWriteLock的state高16位代表读锁状态低16位代表写锁状态
- Semaphore的state用来表示可用信号的个数
- CountDownLatch的state用来表示计数器的值
### CLH队列
CLH是AQS内部维护的FIFO先进先出双端双向队列方便尾部节点插入基于链表数据结构当一个线程竞争资源失败就会将等待资源的线程封装成一个Node节点通过CAS原子操作插入队列尾部最终不同的Node节点连接组成了一个CLH队列所以说AQS通过CLH队列管理竞争资源的线程个人总结CLH队列具有如下几个优点
- 先进先出保证了公平性
- 非阻塞的队列通过自旋锁和CAS保证节点插入和移除的原子性实现无锁快速插入
- 采用了自旋锁思想所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁
### Node内部类
`Node`是`AQS`的内部类,每个等待资源的线程都会封装成`Node`节点组成`CLH`队列、等待队列,所以说`Node`是非常重要的部分,理解它是理解`AQS`的第一步。
![AQS-Node](images/JAVA/AQS-Node.png)**waitStatus等待状态如下**
![AQS-waitStatus等待状态](images/JAVA/AQS-waitStatus等待状态.png)
**nextWaiter特殊标记**
- **`Node`在`CLH`队列时,`nextWaiter`表示共享式或独占式标记**
- **`Node`在条件队列时,`nextWaiter`表示下个`Node`节点指针**
### 流程概述
线程获取资源失败,封装成`Node`节点从`C L H`队列尾部入队并阻塞线程,某线程释放资源时会把`C L H`队列首部`Node`节点关联的线程唤醒(**此处的首部是指第二个节点,后面会细说**),再次获取资源。
![AQS-流程](images/JAVA/AQS-流程.png)
### 入队
获取资源失败的线程需要封装成`Node`节点,接着尾部入队,在`AQS`中提供`addWaiter`函数完成`Node`节点的创建与入队。
```java
/**
* @description: Node节点入队-CLH队列
* @param mode 标记Node.EXCLUSIVE独占式 or Node.SHARED共享式
*/
private Node addWaiter(Node mode) {
// 根据当前线程创建节点等待状态为0
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
if (pred != null) {
// 如果尾节点不等于null把当前节点的前驱节点指向尾节点
node.prev = pred;
// 通过CAS把尾节点指向当前节点
if (compareAndSetTail(pred, node)) {
// 之前尾节点的下个节点指向当前节点
pred.next = node;
return node;
}
}
// 如果添加失败或队列不存在执行end函数
enq(node);
return node;
}
```
添加节点的时候,如果从`CLH`队列已经存在,通过`CAS`快速将当前节点添加到队列尾部,如果添加失败或队列不存在,则指向`enq`函数自旋入队。
```java
/**
* @description: 自旋cas入队
* @param node 节点
*/
private Node enq(final Node node) {
for (;;) { //循环
//获取尾节点
Node t = tail;
if (t == null) {
//如果尾节点为空创建哨兵节点通过cas把头节点指向哨兵节点
if (compareAndSetHead(new Node()))
//cas成功尾节点指向哨兵节点
tail = head;
} else {
//当前节点的前驱节点设指向之前尾节点
node.prev = t;
//cas设置把尾节点指向当前节点
if (compareAndSetTail(t, node)) {
//cas成功之前尾节点的下个节点指向当前节点
t.next = node;
return t;
}
}
}
}
```
通过自旋`CAS`尝试往队列尾部插入节点,直到成功,自旋过程如果发现`CLH`队列不存在时会初始化`CLH`队列,入队过程流程如下图:
![AQS-入队过程流程](images/JAVA/AQS-入队过程流程.png)
第一次循环
- 刚开始C L H队列不存在head与tail都指向null
- 要初始化C L H队列会创建一个哨兵节点head与tail都指向哨兵节点
第二次循环
- 当前线程节点的前驱节点指向尾部节点(哨兵节点)
- 设置当前线程节点为尾部tail指向当前线程节点
- 前尾部节点的后驱节点指向当前线程节点(当前尾部节点)
最后结合addWaiter与enq函数的入队流程图如下
![AQS-入队流程图](images/JAVA/AQS-入队流程图.png)
### 出队
`CLH`队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将`head.next`指向的线程节点唤醒(**`CLH`队列的第二个节点**),如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(**新哨兵节点**),原头部节点出队(**原哨兵节点****acquireQueued函数中的部分代码**
```java
//1.获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//3.原来首节点下个节点指向为null
p.next = null; // help GC
//4.非异常状态防止指向finally逻辑
failed = false;
//5.返回线程中断状态
return interrupted;
}
private void setHead(Node node) {
//节点设置为头部
head = node;
//清空线程
node.thread = null;
//清空前驱节点
node.prev = null;
}
```
只需要关注`1~3`步骤即可,过程非常简单,假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用`CAS`来保证,因为只有一个线程能够成功获取到资源。
![AQS-出队流程](images/JAVA/AQS-出队流程.png)
### 条件变量
Object的wait、notify函数是配合Synchronized锁实现线程间同步协作的功能A Q S的ConditionObject条件变量也提供这样的功能通过ConditionObject的await和signal两类函数完成。不同于Synchronized锁一个A Q S可以对应多个条件变量而Synchronized只有一个。
![AQS-条件变量](images/JAVA/AQS-条件变量.png)
如上图所示ConditionObject内部维护着一个单向条件队列不同于C H L队列条件队列只入队执行await的线程节点并且加入条件队列的节点不能在C H L队列 条件队列出队的节点会入队到C H L队列。
当某个线程执行了ConditionObject的await函数阻塞当前线程线程会被封装成Node节点添加到条件队列的末端其他线程执行ConditionObject的signal函数会将条件队列头部线程节点转移到C H L队列参与竞争资源具体流程如下图
![AQS-CHL队列参与流程](images/JAVA/AQS-CHL队列参与流程.png)
### 模板方法
`AQS`采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下
- 独占式
- **`acquire`获取资源**
- **`release`释放资源**
- 共享式
- **`acquireShared`获取资源**
- **`releaseShared`释放资源**
#### 独占式获取资源
`acquire`是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入`CLH`队列,直到获取资源成功为止,且整个过程忽略中断的影响,`acquire`函数代码如下
![AQS-acquire函数代码](images/JAVA/AQS-acquire函数代码.png)
- 执行tryAcquire函数tryAcquire是由子类实现代表获取资源是否成功如果资源获取失败执行下面的逻辑
- 执行addWaiter函数前面已经介绍过根据当前线程创建出独占式节点并入队CLH队列
- 执行acquireQueued函数自旋阻塞等待获取资源
- 如果acquireQueued函数中获取资源成功根据线程是否被中断状态来决定执行线程中断逻辑
![AQS-acquireQueued流程](images/JAVA/AQS-acquireQueued流程.png)
`acquire`函数的大致流程都清楚了,下面来分析下`acquireQueued`函数,线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下:
```java
/**
* @description: 自旋机制等待获取资源
* @param node
* @param arg
* @return: boolean
*/
final boolean acquireQueued(final Node node, int arg) {
//异常状态,默认是
boolean failed = true;
try {
//该线程是否中断过,默认否
boolean interrupted = false;
for (;;) {//自旋
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//原来首节点下个节点指向为null
p.next = null; // help GC
//非异常状态防止指向finally逻辑
failed = false;
//返回线程中断状态
return interrupted;
}
/**
* 如果前驱节点不是首节点先执行shouldParkAfterFailedAcquire函数shouldParkAfterFailedAcquire做了三件事
* 1.如果前驱节点的等待状态是SIGNAL返回true执行parkAndCheckInterrupt函数返回false
* 2.如果前驱节点的等大状态是CANCELLED把CANCELLED节点全部移出队列条件节点
* 3.以上两者都不符合更新前驱节点的等待状态为SIGNAL返回false
*/
if (shouldParkAfterFailedAcquire(p, node) &&
//使用LockSupport类的静态方法park挂起当前线程直到被唤醒唤醒后检查当前线程是否被中断返回该线程中断状态并重置中断状态
parkAndCheckInterrupt())
//该线程被中断过
interrupted = true;
}
} finally {
// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
if (failed)
cancelAcquire(node);
}
}
```
一图胜千言,核心流程图如下:
![AQS-独占式获取资源流程](images/JAVA/AQS-独占式获取资源流程.png)
#### 独占式释放资源
有获取资源,自然就少不了释放资源,`A Q S`中提供了`release`模板函数来释放资源,模板流程就是线程释放资源成功,唤醒`CLH`队列的第二个线程节点(**首节点的下个节点**),代码如下
```java
/**
* @description: 独占式-释放资源模板函数
* @param arg
* @return: boolean
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放资源成功tryRelease子类实现
//获取头部线程节点
Node h = head;
if (h != null && h.waitStatus != 0) //头部线程节点不为null并且等待状态不为0
//唤醒CHL队列第二个线程节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//获取节点等待状态
int ws = node.waitStatus;
if (ws < 0)
//cas更新节点状态为0
compareAndSetWaitStatus(node, ws, 0);
//获取下个线程节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程节点
LockSupport.unpark(s.thread);
}
}
```
`release`逻辑非常简单,流程图如下:
![AQS-release流程](images/JAVA/AQS-release流程.png)
#### 共享式获取资源
`acquireShared`是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入`CLH`队列,直到获取到资源为止,且整个过程忽略中断的影响,`acquireShared`函数代码如下
```java
/**
* @description: 共享式-获取资源模板函数
* @param arg
* @return: void
*/
public final void acquireShared(int arg) {
/**
* 1.负数表示失败
* 2.0表示成功,但没有剩余可用资源
* 3.正数表示成功且有剩余资源
*/
if (tryAcquireShared(arg) < 0) //tryAcquireShared
//自旋阻塞等待获取资源
doAcquireShared(arg);
}
```
`doAcquireShared`函数与独占式的`acquireQueued`函数逻辑基本一致,唯一的区别就是下图红框部分
![AQS-共享式获取资源](images/JAVA/AQS-共享式获取资源.png)
- **节点的标记是共享式**
- **获取资源成功,还会唤醒后续资源,因为资源数可能`>0`,代表还有资源可获取,所以需要做后续线程节点的唤醒**
#### 共享式释放资源
`AQS`中提供了`releaseShared`模板函数来释放资源模板流程就是线程释放资源成功唤醒CHL队列的第二个线程节点**首节点的下个节点**),代码如下
```java
/**
* @description: 共享式-释放资源模板函数
* @param arg
* @return: boolean
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//释放资源成功tryReleaseShared子类实现
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
continue; // loop to recheck cases
//唤醒头节点下个线程节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要被唤醒更新头节点等待状态为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
```
与独占式释放资源区别不大,都是唤醒头节点的下个节点。
**什么是AQS**
`AQS` 的全称是 `AbstractQueuedSynchronizer`,即`抽象队列同步器`。是Java并发工具的基础采用乐观锁通过CAS与自旋轻量级的获取锁。维护了一个volatile int state代表共享资源和一个FIFO线程等待队列多线程争用资源被阻塞时会进入此队列。很多JUC包比如ReentrantLock、Semaphore、CountDownLatch等并发类均是继承AQS通过AQS的模板方法来实现的。

Binary file not shown.

After

Width:  |  Height:  |  Size: 254 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 504 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 125 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 190 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 186 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 206 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 823 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 180 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 210 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 300 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 161 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 344 KiB

Loading…
Cancel
Save