JAVA
Introduction:收纳技术相关的JAVA知识 `JUC`、`Thread`、`Lock`、`I/O` 等总结!
[TOC]
# J.U.C
## 并发特性
JAVA里面进行多线程通信的主要方式就是 `共享内存` 的方式,共享内存主要的关注点有两个:`可见性` 和 `有序性`。加上复合操作的 `原子性`,可以认为JAVA的线程安全性问题主要关注点有3个(JAVA内存模型JMM解决了可见性和有序性的问题,而锁解决了原子性的问题):`可见性`、`有序性`、`原子性`
- **原子性(Atomicity)**:在Java中原子性指的是一个或多个操作要么全部执行成功要么全部执行失败
- **有序性(Ordering)**:程序执行的顺序按照代码的先后顺序执行(处理器可能会对指令进行重排序)
- **可见性(Visibility)**:指在多线程环境下,当一个线程修改了某一个共享变量的值,其它线程能够立刻知道这个修改
**① 重排序**
指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。从JAVA源码到最终实际执行的指令序列,会经历下面3种重排序(主要流程):
![源码指令](images/JAVA/源码指令.png)
**指令重排序分类**
- **编译器优化的重排序**:编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序;
- **指令级并行的重排序**:现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序;
- **内存系统的重排序**:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行的。
**② 顺序一致性**
顺序一致性内存模型是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存模型作为参照。顺序一致性特征如下:
- 一个线程中的所有操作必须按照程序的顺序来执行
- (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性的内存模型中,每个操作必须原子执行并且立刻对所有线程可见
## Unsafe
Java不能直接访问操作系统底层,而是通过本地方法来访问。**Unsafe类提供了硬件级别的原子操作**,主要提供以下功能:
- **通过Unsafe类可以分配内存,可以释放内存**
类中提供的3个本地方法allocateMemory(申请)、reallocateMemory(扩展)、freeMemory(销毁)分别用于分配内存,扩充内存和释放内存,与C语言中的3个方法对应。
- **可以定位对象某字段的内存位置,也可以修改对象的字段值,即使它是私有的**
- 字段的定位
- 数组元素定位
- **挂起与恢复**
将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。整个并发框架中对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本pack方法,但最终都调用了Unsafe.park()方法。
- **CAS操作**
是通过compareAndSwapXXX方法实现的
## LockSupport
LockSupport 和 CAS 是Java并发包中很多并发工具控制机制的基础,它们底层其实都是依赖Unsafe实现。LockSupport 提供park()和unpark()方法实现阻塞线程和解除线程阻塞。
LockSupport和每个使用它的线程都与一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。
park()和unpark()不会有 `Thread.suspend` 和 `Thread.resume` 所可能引发的死锁问题,由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。
## CAS机制
CAS(`Compare And Swap`,即比较并交换),是解决多线程并行情况下使用锁造成性能损耗的一种机制。其原理是利用`sun.misc.Unsafe.java` 类通过JNI来调用硬件级别的原子操作来实现CAS(即CAS是借助C来调用CPU底层指令实现的)。
**CAS机制=比较并交换+乐观锁机制+锁自旋**
**设计思想**:如果`内存位置` 的值与 `预期原值` 相匹配,那么处理器会自动将该位置值更新为新值,否则处理器不做任何操作。
ReentrantLock、ReentrantReadWriteLock 都是基于 AbstractQueuedSynchronizer (AQS),而 AQS 又是基于 CAS。CAS 的全称是 Compare And Swap(比较与交换),它是一种无锁算法。synchronized和Lock都采用了悲观锁的机制,而CAS是一种乐观锁的实现。乐观锁的原理就是每次不加锁去执行某项操作,如果发生冲突则失败并重试,直到成功为止,其实本质上不算锁,所以很多地方也称之为**自旋**。乐观锁用到的主要机制就是**CAS**(Compare And Swap)。
**CAS特性**
- 通过JNI借助C来调用CPU底层指令实现
- 非阻塞算法
- 非独占锁
**CAS缺陷**
- **ABA问题**:X线程读到为A;Y线程立刻改为B,又改为A;X线程发现值还是A,此时CAS比较值相等,自旋成功
- 使用数据乐观锁的方式给它加一个版本号或者时间戳,如使用 `AtomicStampedReference` 解决
- **自旋消耗资源**:多个线程争夺同一个资源时,如果自旋一直不成功,将会一直占用CPU
- 破坏掉死循环,当超过一定时间或者一定次数时,return退出
- **只能保证一个共享变量的原子操作**
- 可以加锁来解决
- 封装成对象类解决,如使用 `AtomicReference` 解决
## AQS框架
![AQS-简化流程图](images/JAVA/AQS-简化流程图.png)
### 基础
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,如常用的ReentrantLock、Semaphore、CountDownLatch等。
AQS定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作,虽然大多数开发者可能永远不会使用AQS实现自己的同步器(JUC包下提供的同步器基本足够应对日常开发),但是知道AQS的原理对于架构设计还是很有帮助的,面试还可以吹吹牛,下面是AQS的组成结构。
![AQS的组成结构](images/JAVA/AQS的组成结构.png)
三部分组成:`volatile int state同步状态`、`Node组成的CLH队列`、`ConditionObject条件变量`(包含Node组成的条件单向队列)。
**状态**
- `getState()`:返回同步状态
- `setState(int newState)`:设置同步状态
- `compareAndSetState(int expect, int update)`:使用CAS设置同步状态
- `isHeldExclusively()`:当前线程是否持有资源
**独占资源(不响应线程中断)**
- `tryAcquire(int arg)`:独占式获取资源,子类实现
- `acquire(int arg)`:独占式获取资源模板
- `tryRelease(int arg)`:独占式释放资源,子类实现
- `release(int arg)`:独占式释放资源模板
**共享资源(不响应线程中断)**
- `tryAcquireShared(int arg)`:共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
- `acquireShared(int arg)`:共享形获取资源模板
- `tryReleaseShared(int arg)`:共享式释放资源,子类实现
- `releaseShared(int arg)`:共享式释放资源模板
### 同步状态
在AQS中维护了一个同步状态变量state,getState函数获取同步状态,setState、compareAndSetState函数修改同步状态,对于AQS来说,线程同步的关键是对state的操作,可以说获取、释放资源是否成功都是由state决定的,比如state>0代表可获取资源,否则无法获取,所以state的具体语义由实现者去定义,现有的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样。
- ReentrantLock的state用来表示是否有锁资源
- ReentrantReadWriteLock的state高16位代表读锁状态,低16位代表写锁状态
- Semaphore的state用来表示可用信号的个数
- CountDownLatch的state用来表示计数器的值
### CLH队列
CLH是AQS内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构,当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,所以说AQS通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:
- 先进先出保证了公平性
- 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入
- 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁
### Node内部类
`Node`是`AQS`的内部类,每个等待资源的线程都会封装成`Node`节点组成`CLH`队列、等待队列,所以说`Node`是非常重要的部分,理解它是理解`AQS`的第一步。
![AQS-Node](images/JAVA/AQS-Node.png)**waitStatus等待状态如下**
![AQS-waitStatus等待状态](images/JAVA/AQS-waitStatus等待状态.png)
**nextWaiter特殊标记**
- **`Node`在`CLH`队列时,`nextWaiter`表示共享式或独占式标记**
- **`Node`在条件队列时,`nextWaiter`表示下个`Node`节点指针**
### 流程概述
线程获取资源失败,封装成`Node`节点从`CLH`队列尾部入队并阻塞线程,某线程释放资源时会把`CLH`队列首部`Node`节点关联的线程唤醒(**此处的首部是指第二个节点,后面会细说**),再次获取资源。
![AQS-流程](images/JAVA/AQS-流程.png)
### 入队
获取资源失败的线程需要封装成`Node`节点,接着尾部入队,在`AQS`中提供`addWaiter`函数完成`Node`节点的创建与入队。
```java
/**
* @description: Node节点入队-CLH队列
* @param mode 标记Node.EXCLUSIVE独占式 or Node.SHARED共享式
*/
private Node addWaiter(Node mode) {
// 根据当前线程创建节点,等待状态为0
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
if (pred != null) {
// 如果尾节点不等于null,把当前节点的前驱节点指向尾节点
node.prev = pred;
// 通过CAS把尾节点指向当前节点
if (compareAndSetTail(pred, node)) {
// 之前尾节点的下个节点指向当前节点
pred.next = node;
return node;
}
}
// 如果添加失败或队列不存在,执行end函数
enq(node);
return node;
}
```
添加节点的时候,如果从`CLH`队列已经存在,通过`CAS`快速将当前节点添加到队列尾部,如果添加失败或队列不存在,则指向`enq`函数自旋入队。
```java
/**
* @description: 自旋cas入队
* @param node 节点
*/
private Node enq(final Node node) {
for (;;) { //循环
//获取尾节点
Node t = tail;
if (t == null) {
//如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
if (compareAndSetHead(new Node()))
//cas成功,尾节点指向哨兵节点
tail = head;
} else {
//当前节点的前驱节点设指向之前尾节点
node.prev = t;
//cas设置把尾节点指向当前节点
if (compareAndSetTail(t, node)) {
//cas成功,之前尾节点的下个节点指向当前节点
t.next = node;
return t;
}
}
}
}
```
通过自旋`CAS`尝试往队列尾部插入节点,直到成功,自旋过程如果发现`CLH`队列不存在时会初始化`CLH`队列,入队过程流程如下图:
![AQS-入队过程流程](images/JAVA/AQS-入队过程流程.png)
第一次循环
- 刚开始C L H队列不存在,head与tail都指向null
- 要初始化C L H队列,会创建一个哨兵节点,head与tail都指向哨兵节点
第二次循环
- 当前线程节点的前驱节点指向尾部节点(哨兵节点)
- 设置当前线程节点为尾部,tail指向当前线程节点
- 前尾部节点的后驱节点指向当前线程节点(当前尾部节点)
最后结合addWaiter与enq函数的入队流程图如下
![AQS-入队流程图](images/JAVA/AQS-入队流程图.png)
### 出队
`CLH`队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将`head.next`指向的线程节点唤醒(**`CLH`队列的第二个节点**),如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(**新哨兵节点**),原头部节点出队(**原哨兵节点**)**acquireQueued函数中的部分代码**
```java
//1.获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//3.原来首节点下个节点指向为null
p.next = null; // help GC
//4.非异常状态,防止指向finally逻辑
failed = false;
//5.返回线程中断状态
return interrupted;
}
private void setHead(Node node) {
//节点设置为头部
head = node;
//清空线程
node.thread = null;
//清空前驱节点
node.prev = null;
}
```
只需要关注`1~3`步骤即可,过程非常简单,假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用`CAS`来保证,因为只有一个线程能够成功获取到资源。
![AQS-出队流程](images/JAVA/AQS-出队流程.png)
### 条件变量
Object的wait、notify函数是配合Synchronized锁实现线程间同步协作的功能,A Q S的ConditionObject条件变量也提供这样的功能,通过ConditionObject的await和signal两类函数完成。不同于Synchronized锁,一个A Q S可以对应多个条件变量,而Synchronized只有一个。
![AQS-条件变量](images/JAVA/AQS-条件变量.png)
如上图所示,ConditionObject内部维护着一个单向条件队列,不同于C H L队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在C H L队列, 条件队列出队的节点,会入队到C H L队列。
当某个线程执行了ConditionObject的await函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObject的signal函数,会将条件队列头部线程节点转移到C H L队列参与竞争资源,具体流程如下图
![AQS-CHL队列参与流程](images/JAVA/AQS-CHL队列参与流程.png)
### 模板方法
`AQS`采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下
- 独占式
- **`acquire`获取资源**
- **`release`释放资源**
- 共享式
- **`acquireShared`获取资源**
- **`releaseShared`释放资源**
#### 独占式获取资源
`acquire`是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入`CLH`队列,直到获取资源成功为止,且整个过程忽略中断的影响,`acquire`函数代码如下
![AQS-acquire函数代码](images/JAVA/AQS-acquire函数代码.png)
- 执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功,如果资源获取失败,执行下面的逻辑
- 执行addWaiter函数(前面已经介绍过),根据当前线程创建出独占式节点,并入队CLH队列
- 执行acquireQueued函数,自旋阻塞等待获取资源
- 如果acquireQueued函数中获取资源成功,根据线程是否被中断状态,来决定执行线程中断逻辑
![AQS-acquireQueued流程](images/JAVA/AQS-acquireQueued流程.png)
`acquire`函数的大致流程都清楚了,下面来分析下`acquireQueued`函数,线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下:
```java
/**
* @description: 自旋机制等待获取资源
* @param node
* @param arg
* @return: boolean
*/
final boolean acquireQueued(final Node node, int arg) {
//异常状态,默认是
boolean failed = true;
try {
//该线程是否中断过,默认否
boolean interrupted = false;
for (;;) {//自旋
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//原来首节点下个节点指向为null
p.next = null; // help GC
//非异常状态,防止指向finally逻辑
failed = false;
//返回线程中断状态
return interrupted;
}
/**
* 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
* 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
* 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
* 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
*/
if (shouldParkAfterFailedAcquire(p, node) &&
//使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
parkAndCheckInterrupt())
//该线程被中断过
interrupted = true;
}
} finally {
// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
if (failed)
cancelAcquire(node);
}
}
```
一图胜千言,核心流程图如下:
![AQS-独占式获取资源流程](images/JAVA/AQS-独占式获取资源流程.png)
#### 独占式释放资源
有获取资源,自然就少不了释放资源,`A Q S`中提供了`release`模板函数来释放资源,模板流程就是线程释放资源成功,唤醒`CLH`队列的第二个线程节点(**首节点的下个节点**),代码如下
```java
/**
* @description: 独占式-释放资源模板函数
* @param arg
* @return: boolean
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
//获取头部线程节点
Node h = head;
if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
//唤醒CHL队列第二个线程节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//获取节点等待状态
int ws = node.waitStatus;
if (ws < 0)
//cas更新节点状态为0
compareAndSetWaitStatus(node, ws, 0);
//获取下个线程节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程节点
LockSupport.unpark(s.thread);
}
}
```
`release`逻辑非常简单,流程图如下:
![AQS-release流程](images/JAVA/AQS-release流程.png)
#### 共享式获取资源
`acquireShared`是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入`CLH`队列,直到获取到资源为止,且整个过程忽略中断的影响,`acquireShared`函数代码如下
```java
/**
* @description: 共享式-获取资源模板函数
* @param arg
* @return: void
*/
public final void acquireShared(int arg) {
/**
* 1.负数表示失败
* 2.0表示成功,但没有剩余可用资源
* 3.正数表示成功且有剩余资源
*/
if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
//自旋阻塞等待获取资源
doAcquireShared(arg);
}
```
`doAcquireShared`函数与独占式的`acquireQueued`函数逻辑基本一致,唯一的区别就是下图红框部分
![AQS-共享式获取资源](images/JAVA/AQS-共享式获取资源.png)
- **节点的标记是共享式**
- **获取资源成功,还会唤醒后续资源,因为资源数可能`>0`,代表还有资源可获取,所以需要做后续线程节点的唤醒**
#### 共享式释放资源
`AQS`中提供了`releaseShared`模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(**首节点的下个节点**),代码如下
```java
/**
* @description: 共享式-释放资源模板函数
* @param arg
* @return: boolean
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
continue; // loop to recheck cases
//唤醒头节点下个线程节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
```
与独占式释放资源区别不大,都是唤醒头节点的下个节点。
**什么是AQS?**
`AQS` 的全称是 `AbstractQueuedSynchronizer`,即`抽象队列同步器`。是Java并发工具的基础,采用乐观锁,通过CAS与自旋轻量级的获取锁。维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。很多JUC包,比如ReentrantLock、Semaphore、CountDownLatch等并发类均是继承AQS,通过AQS的模板方法,来实现的。
![AQS](images/JAVA/AQS.png)
**核心思想**
- 若请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,并将共享资源设置为锁定状态
- 若共享资源被占用,则需要阻塞等待唤醒机制保证锁的分配
**工作原理**
**AQS = `同步状态(volatile int state)` + `同步队列(即等待队列,FIFO的CLH队列)` + `条件队列(ConditionObject)`**
- **state**:代表共享资源。`volatile` 保证并发读,`CAS` 保证并发写
- **同步队列(即等待队列,CLH队列)**:是CLH变体的虚拟双向队列(先进先出FIFO)来等待获取共享资源。当前线程可以通过signal和signalAll将条件队列中的节点转移到同步队列中
- **条件队列(ConditionObject)**:当前线程存在于同步队列的头节点,可以通过await从同步队列转移到条件队列中
**实现原理**
- 通过CLH队列的变体:FIFO双向队列实现的
- 每个请求资源的线程被包装成一个节点来实现锁的分配
- 通过`volatile`的`int`类型的成员变量`state`表示同步状态
- 通过FIFO队列完成资源获取的排队工作
- 通过CAS完成对`state`的修改
### 共享方式
AQS定义两种资源共享方式。无论是独占锁还是共享锁,本质上都是对AQS内部的一个变量`state`的获取。`state`是一个原子的int变量,用来表示锁状态、资源数等。
**① 独占锁(`Exclusive`)模式**:只能被一个线程获取到(`Reentrantlock`)。
![独占锁(Exclusive)模式](images/JAVA/独占锁(Exclusive)模式.png)
**② 共享锁(`Share`)模式**:可以被多个线程同时获取(`Semaphore/CountDownLatch/ReadWriteLock`)。
![共享锁(Share)模式](images/JAVA/共享锁(Share)模式.png)
### state机制
提供`volatile`变量`state`,用于同步线程之间的共享状态。通过 `CAS` 和 `volatile` 保证其原子性和可见性。核心要点:
- state 用 `volatile` 修饰,保证多线程中的可见性
- `getState()` 和 `setState()` 方法**采用final修饰**,限制AQS的子类重写它们两
- `compareAndSetState()` 方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写
**state应用案例**:
| 案例 | 描述 |
| ------------------------ | ------------------------------------------------------------ |
| `Semaphore` | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数 |
| `CountDownLatch` | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过 |
| `ReentrantReadWriteLock` | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数 |
| `ThreadPoolExecutor` | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease) |
| `ReentrantLock` | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理 |
### 双队列
![AQS同步队列与条件队列](images/JAVA/AQS同步队列与条件队列.png)
![AQS-Lock-Condition](images/JAVA/AQS-Lock-Condition.png)
- **同步队列(syncQueue):管理多个线程的休眠与唤醒**
- **条件队列(waitQueue):类似wait与signal作用,实现在使用锁时对线程管理**
**注意**
- 同步队列与条件队列节点可相互转化
- 一个线程只能存在于两个队列中的一个
#### 同步队列
**同步队列是用来管理多个线程的休眠与唤醒**。
同步队列依赖一个双向链表(CHL)来完成同步状态的管理,当前线程获取同步状态失败后,同步器会将线程构建成一个节点,并将其加入同步队列中。通过`signal`或`signalAll`将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
![同步队列(syncQueue)结构](images/JAVA/同步队列(syncQueue)结构.png)
如果没有锁竞争,线程可以直接获取到锁,就不会进入同步队列。即没有锁竞争时,同步队列(syncQueue)是空的,当存在锁竞争时,线程会进入到同步队列中。一旦进入到同步队列中,就会有线程切换。标准的 CHL 无锁队列是单向链表,同步队列(syncQueue) 在 CHL 基础上做了改进:
- **同步队列是双向链表**。和二叉树一样,双向链表目前也没有无锁算法的实现。双向链表需要同时设置前驱和后继结点,这两次操作只能保证一个是原子性的
- **node.pre一定可以遍历所有结点,是线程安全的**。而后继结点 node.next 则是线程不安全的。也就是说,node.pre 一定可以遍历整个链表,而 node.next 则不一定
#### 条件队列
**条件队列是类似wait与signal作用,实现在使用锁时对线程管理。且由于实现了Condition,对线程的管理可更加细化**。
当线程存在于同步队列的头结点时,调用 `await` 方法进行阻塞(从同步队列转化到条件队列)。Condition条件队列(`waitQueue`)要比Lock同步队列(`syncQueue`)简单很多,最重要的原因是 `waitQueue` 的操作都是在获取锁的线程中执行,不存在数据竞争的问题。
![Condition等待队列结构](images/JAVA/Condition等待队列结构.png)
ConditionObject重要的方法说明:
- **await**:阻塞线程并放弃锁,加入到等待队列中
- **signal**:唤醒等待线程,没有特殊的要求,尽量使用 signalAll
- **addConditionWaiter**:将结点(状态为 CONDITION)添加到等待队列 waitQueue 中,不存在锁竞争
- **fullyRelease**:释放锁,并唤醒后继等待线程
- **isOnSyncQueue**:根据结点是否在同步队列上,判断等待线程是否已经被唤醒
- **acquireQueued**:Lock 接口中的方法,通过同步队列方法竞争锁
- **unlinkCancelledWaiters**:清理取消等待的线程
### 框架架构图
![AQS框架架构图](images/JAVA/AQS框架架构图.png)
**常见问题**
**问题1:state为什么要提供setState和compareAndSetState两种修改状态的方法?**
这个问题,关键是修改状态时是否存在数据竞争,如果有则必须使用 compareAndSetState。
- lock.lock() 获取锁时会发生数据竞争,必须使用 CAS 来保障线程安全,也就是 compareAndSetState 方法
- lock.unlock() 释放锁时,线程已经获取到锁,没有数据竞争,也就可以直接使用 setState 修改锁的状态
**问题2:AQS为什么选择node.prev前驱结点的原子性,而node.next后继结点则是辅助结点?**
- next 域:需要修改二处来保证原子性,一是 tail.next;二是 tail 指针
- prev 域:只需要修改一处来保证原子性,就是 tail 指针。你可能会说不需要修改 node.prev 吗?当然需要,但 node 还没添加到链表中,其 node.prev 修改并没有锁竞争的问题,将 tail 指针指向 node 时,如果失败会通过自旋不断尝试
**问题3:AQS明知道node.next有可见性问题,为什么还要设计成双向链表?**
唤醒同步线程时,如果有后继结点,那么时间复杂为 O(1)。否则只能只反向遍历,时间复杂度为 O(n)。以下两种情况,则认为 node.next 不可靠,需要从 tail 反向遍历。
- node.next=null:可能结点刚刚插入链表中,node.next 仍为空。此时有其它线程通过 unparkSuccessor 来唤醒该线程
- node.next.waitStatus>0:结点已经取消,next 值可能已经改变
## Condition
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
## volatile
Java 语言提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。volatile 变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。
**volatile特性与原理**
- **可见性**:volatile修饰的变量,JVM保证了每次都跳过工作内存和缓存行(CPU Cache)来读取主内存中的最新值
- **有序性**:即JVM使用内存屏障来禁止了该变量操作的指令重排序优化
- **volatile性能**:volatile的读性能消耗与普通变量几乎相同,但是写操作稍慢,因为它需要在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行
- **内存屏障**:加入volatile关键字时,会多出一个lock前缀指令,lock前缀指令相当于一个内存屏障(也称内存栅栏)
- **轻量级同步机制**:Java提供了一种稍弱同步机制,即volatile变量,是一种比sychronized关键字更轻量级的同步机制
- **禁止重排序**:volatile 禁止了指令重排
**使用场景**
- 状态标记量
- DCL(Double Check Lock)
**CPU缓存**
CPU缓存的出现主要是为了解决CPU运算速度与内存读写速度不匹配的矛盾,因为CPU运算速度要比内存读写速度快得多。按照读取顺序与CPU结合的紧密程度,CPU缓存可分为:
- **一级缓存**:简称L1 Cache,位于CPU内核的旁边,是与CPU结合最为紧密的CPU缓存
- **二级缓存**:简称L2 Cache,分内部和外部两种芯片,内部芯片二级缓存运行速度与主频相同,外部芯片二级缓存运行速度则只有主频的一半
- **三级缓存**:简称L3 Cache,部分高端CPU才有
**volatile的特性**
- 保证了线程可见性,不保证原子性,保证一定的有序性(禁止指令重排序)
- 在JVM底层volatile是采用“内存屏障”来实现的
- volatile常用场景:状态标记、DCL(双重检查锁,Double Check)
- volatile不会引起线程上下文的切换和调度
- 基于lock前缀指令,相当于内存屏障(内存栅栏)
## lambda
### 函数式接口
函数接口是只有一个抽象方法的接口,用作 Lambda 表达式的类型。使用@FunctionalInterface注解修饰的类,编译器会检测该类是否只有一个抽象方法或接口,否则,会报错。可以有多个默认方法,静态方法。JAVA8自带的常用函数式接口:
| 函数接口 | 抽象方法 | 功能 | 参数 | 返回类型 | 示例 |
| -------------- | --------------- | ---------------------- | ----- | -------- | ------------------- |
| Predicate | test(T t) | 判断真假 | T | boolean | 身高大于185cm吗? |
| Consumer | accept(T t) | 消费消息 | T | void | 输出一个值 |
| Function | R apply(T t) | 将T映射为R(转换功能) | T | R | 取student对象的名字 |
| Supplier | T get() | 生产消息 | None | T | 工厂方法 |
| UnaryOperator | T apply(T t) | 一元操作 | T | T | 逻辑非(!) |
| BinaryOperator | apply(T t, U u) | 二元操作 | (T,T) | (T) | 求两个数的乘积(*) |
### 常用的流
#### collect
**将流转换为集合。有toList()、toSet()、toMap()等,及早求值**。
```java
public class TestCase {
public static void main(String[] args) {
List studentList = Stream.of(new Student("路飞", 22, 175),
new Student("红发", 40, 180), new Student("白胡子", 50, 185)).collect(Collectors.toList());
System.out.println(studentList);
}
}
// 输出结果
// [Student{name='路飞', age=22, stature=175, specialities=null},
// Student{name='红发', age=40, stature=180, specialities=null},
// Student{name='白胡子', age=50, stature=185, specialities=null}]
```
#### filter
顾名思义,起**过滤筛选**的作用。**内部就是Predicate接口。惰性求值。**
![lambda-filter](images/JAVA/lambda-filter.jpg)
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
List list = students.stream()
.filter(stu -> stu.getStature() < 180)
.collect(Collectors.toList());
System.out.println(list);
}
}
// 输出结果
// [Student{name='路飞', age=22, stature=175, specialities=null}]
```
#### map
**转换功能,内部就是Function接口。惰性求值。**
![lambda-map](images/JAVA/lambda-map.jpg)
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
List names = students.stream().map(student -> student.getName())
.collect(Collectors.toList());
System.out.println(names);
}
}
// 输出结果
// [路飞, 红发, 白胡子]
```
例子中将student对象转换为String对象,获取student的名字。
#### flatMap
**将多个Stream合并为一个Stream。惰性求值。**
![lambda-flatMap](images/JAVA/lambda-flatMap.jpg)
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
List studentList = Stream.of(students,
asList(new Student("艾斯", 25, 183),
new Student("雷利", 48, 176)))
.flatMap(students1 -> students1.stream()).collect(Collectors.toList());
System.out.println(studentList);
}
}
// 输出结果
// [Student{name='路飞', age=22, stature=175, specialities=null},
// Student{name='红发', age=40, stature=180, specialities=null},
// Student{name='白胡子', age=50, stature=185, specialities=null},
// Student{name='艾斯', age=25, stature=183, specialities=null},
// Student{name='雷利', age=48, stature=176, specialities=null}]
```
调用Stream.of的静态方法将两个list转换为Stream,再通过flatMap将两个流合并为一个。
#### max和min
我们经常会在集合中**求最大或最小值**,使用流就很方便。**及早求值。**
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
Optional max = students.stream()
.max(Comparator.comparing(stu -> stu.getAge()));
Optional min = students.stream()
.min(Comparator.comparing(stu -> stu.getAge()));
//判断是否有值
if (max.isPresent()) {
System.out.println(max.get());
}
if (min.isPresent()) {
System.out.println(min.get());
}
}
}
// 输出结果
// Student{name='白胡子', age=50, stature=185, specialities=null}
// Student{name='路飞', age=22, stature=175, specialities=null}
```
**max、min接收一个Comparator**(例子中使用java8自带的静态函数,只需要传进需要比较值即可),并且返回一个Optional对象,该对象是java8新增的类,专门为了防止null引发的空指针异常。可以使用max.isPresent()判断是否有值;可以使用max.orElse(new Student()),当值为null时就使用给定值;也可以使用max.orElseGet(() -> new Student());这需要传入一个Supplier的lambda表达式。
#### count
**统计功能,一般都是结合filter使用,因为先筛选出我们需要的再统计即可。及早求值。**
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
long count = students.stream().filter(s1 -> s1.getAge() < 45).count();
System.out.println("年龄小于45岁的人数是:" + count);
}
}
// 输出结果
// 年龄小于45岁的人数是:2
```
#### reduce
**reduce 操作可以实现从一组值中生成一个值**。在上述例子中用到的 count 、 min 和 max 方法,因为常用而被纳入标准库中。事实上,这些方法都是 reduce 操作。**及早求值。**
![lambda-reduce](images/JAVA/lambda-reduce.jpg)
```java
public class TestCase {
public static void main(String[] args) {
Integer reduce = Stream.of(1, 2, 3, 4).reduce(0, (acc, x) -> acc+ x);
System.out.println(reduce);
}
}
// 输出结果:10
```
我们看得reduce接收了一个初始值为0的累加器,依次取出值与累加器相加,最后累加器的值就是最终的结果。
### 高级集合类及收集器
#### 转换成值
**收集器,一种通用的、从流生成复杂值的结构。**只要将它传给 collect 方法,所有的流就都可以使用它了。标准类库已经提供了一些有用的收集器,**以下示例代码中的收集器都是从 java.util.stream.Collectors 类中静态导入的。**
```java
public class CollectorsTest {
public static void main(String[] args) {
List students1 = new ArrayList<>(3);
students1.add(new Student("路飞", 23, 175));
students1.add(new Student("红发", 40, 180));
students1.add(new Student("白胡子", 50, 185));
OutstandingClass ostClass1 = new OutstandingClass("一班", students1);
//复制students1,并移除一个学生
List students2 = new ArrayList<>(students1);
students2.remove(1);
OutstandingClass ostClass2 = new OutstandingClass("二班", students2);
//将ostClass1、ostClass2转换为Stream
Stream classStream = Stream.of(ostClass1, ostClass2);
OutstandingClass outstandingClass = biggestGroup(classStream);
System.out.println("人数最多的班级是:" + outstandingClass.getName());
System.out.println("一班平均年龄是:" + averageNumberOfStudent(students1));
}
/**
* 获取人数最多的班级
*/
private static OutstandingClass biggestGroup(Stream outstandingClasses) {
return outstandingClasses.collect(
maxBy(comparing(ostClass -> ostClass.getStudents().size())))
.orElseGet(OutstandingClass::new);
}
/**
* 计算平均年龄
*/
private static double averageNumberOfStudent(List students) {
return students.stream().collect(averagingInt(Student::getAge));
}
}
// 输出结果
// 人数最多的班级是:一班
// 一班平均年龄是:37.666666666666664
```
maxBy或者minBy就是求最大值与最小值。
#### 转换成块
**常用的流操作是将其分解成两个集合,Collectors.partitioningBy帮我们实现了,接收一个Predicate函数式接口。**
![lambda-partitioningBy](images/JAVA/lambda-partitioningBy.jpg)
将示例学生分为会唱歌与不会唱歌的两个集合。
```java
public class PartitioningByTest {
public static void main(String[] args) {
// 省略List students的初始化
Map> listMap = students.stream().collect(
Collectors.partitioningBy(student -> student.getSpecialities().
contains(SpecialityEnum.SING)));
}
}
```
#### 数据分组
数据分组是一种更自然的分割数据操作,与将数据分成 ture 和 false 两部分不同,**可以使用任意值对数据分组。Collectors.groupingBy接收一个Function做转换。**
![lambda-groupingBy](images/JAVA/lambda-groupingBy.jpg)
**如图,使用groupingBy将根据进行分组为圆形一组,三角形一组,正方形一组。**例子:根据学生第一个特长进行分组
```java
public class GroupingByTest {
public static void main(String[] args) {
//省略List students的初始化
Map> listMap =
students.stream().collect(
Collectors.groupingBy(student -> student.getSpecialities().get(0)));
}
}
```
Collectors.groupingBy与SQL 中的 group by 操作是一样的。
#### 字符串拼接
如果将所有学生的名字拼接起来,怎么做呢?通常只能创建一个StringBuilder,循环拼接。使用Stream,使用Collectors.joining()简单容易。
```java
public class JoiningTest {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
String names = students.stream()
.map(Student::getName).collect(Collectors.joining(",","[","]"));
System.out.println(names);
}
}
//输出结果
//[路飞,红发,白胡子]
```
joining接收三个参数,第一个是分界符,第二个是前缀符,第三个是结束符。也可以不传入参数Collectors.joining(),这样就是直接拼接。
## Striped64
Striped64的设计思路是在竞争激烈的时候尽量分散竞争。
**Striping(条带化)**
大多数磁盘系统都对访问次数(每秒的 I/O 操作,IOPS)和数据传输率(每秒传输的数据量,TPS)有限制。当达到这些限制时,后续需要访问磁盘的进程就需要等待,这就是所谓的磁盘冲突。当多个进程同时访问一个磁盘时,可能会出现磁盘冲突。因此,避免磁盘冲突是优化 I/O 性能的一个重要目标。
条带(strip)是把连续的数据分割成相同大小的数据块,把每段数据分别写入到阵列中的不同磁盘上的方法。使用条带化技术使得多个进程同时访问数据的多个不同部分而不会造成磁盘冲突,而且在需要对这种数据进行顺序访问的时候可以获得最大程度上的 I/O 并行能力,从而获得非常好的性能。
**Striped64设计**
Striped64通过维护一个原子更新Cell表和一个base字段,并使用每个线程的探针字段作为哈希码映射到表的指定Cell。当竞争激烈时,将多线程的更新分散到不同Cell进行,有效降低了高并发下CAS更新的竞争,从而最大限度地提高了Striped64的吞吐量。Striped64为实现高吞吐量的并发计数组件奠定了基础,其中LongAdder就是基于Striped64实现,此外Java8中ConcurrentHashMap实现的并发计数功能也是基于Striped64的设计理念,还有hystrix、guava等实现的并发计数组件也离不开Striped64。
## LongAdder
LongAdder是JDK1.8开始出现的,所提供的API基本上可替换掉原先AtomicLong。LongAdder所使用思想就是**热点分离**,这点可以类比一下ConcurrentHashMap的设计思想。就是将value值分离成一个数组,当多线程访问时,通过hash算法映射到其中的一个数字进行计数。而最终的结果,就是这些数组的求和累加。这样一来,就减小了锁的粒度。如下图所示:
![LongAdder原理](images/JAVA/LongAdder原理.png)
LonAdder和AtomicLong性能测试对比:
![LongAdder和AtomicLong性能对比](images/JAVA/LongAdder和AtomicLong性能对比.png)
LongAdder就是基于Striped64实现,用于并发计数时,若不存在竞争或竞争比较低时,LongAdder具有和AtomicLong差不多的效率。但是,高并发环境下,竞争比较严重时,LongAdder的cells表发挥作用,将并发更新分散到不同Cell进行,有效降低了CAS更新的竞争,从而极大提高了LongAdder的并发计数能力。因此,高并发场景下,要实现吞吐量更高的计数器,推荐使用LongAdder。
## Semaphore
Semaphore是一个计数信号量,它的本质是一个"共享锁"。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
**数据结构**
![Semaphore数据结构](images/JAVA/Semaphore数据结构.jpg)
- 和"ReentrantLock"一样,Semaphore也包含sync对象,sync是Sync类型;而且Sync是一个继承于AQS的抽象类
- Sync包括两个子类:"公平信号量"FairSync 和 "非公平信号量"NonfairSync。sync是"FairSync的实例",或者"NonfairSync的实例";默认情况下,sync是NonfairSync(即,默认是非公平信号量)
## CyclicBarrier
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
**比较CountDownLatch和CyclicBarrier**
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
**数据结构**
![CyclicBarrier数据结构](images/JAVA/CyclicBarrier数据结构.jpg)
CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。
## CountDownLatch
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
**CountDownLatch和CyclicBarrier的区别**
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
**数据结构**
![CountDownLatch数据结构](images/JAVA/CountDownLatch数据结构.jpg)
CountDownLatch的数据结构很简单,它是通过"共享锁"实现的。它包含了sync对象,sync是Sync类型。Sync是实例类,它继承于AQS。
## CompletableFuture
CompletableFuture是Java 8新增的一个类,用于异步编程,继承了Future和CompletionStage。Future主要具备对请求结果独立处理的功能,CompletionStage用于实现流式处理,实现异步请求的各个阶段组合或链式处理,因此CompletableFuture能实现整个异步调用接口的扁平化和流式处理,解决原有Future处理一系列链式异步请求时的复杂编码:
![img](images/JAVA/CompletableFuture.jpg)
**Future的局限性**
- **Future 的结果在非阻塞的情况下,不能执行更进一步的操作**
我们知道,使用Future时只能通过isDone()方法判断任务是否完成,或者通过get()方法阻塞线程等待结果返回,它不能非阻塞的情况下,执行更进一步的操作
- **不能组合多个Future的结果**
假设你有多个Future异步任务,你希望最快的任务执行完时,或者所有任务都执行完后,进行一些其他操作
- **多个Future不能组成链式调用**
当异步任务之间有依赖关系时,Future不能将一个任务的结果传给另一个异步任务,多个Future无法创建链式的工作流
- **没有异常处理**
**注意事项**
- **CompletableFuture默认线程池是否满足使用**
前面提到创建CompletableFuture异步任务的静态方法runAsync和supplyAsync等,可以指定使用的线程池,不指定则用CompletableFuture的默认线程池:
```java
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
```
可以看到,CompletableFuture默认线程池是调用ForkJoinPool的commonPool()方法创建,这个默认线程池的核心线程数量根据CPU核数而定,公式为`Runtime.getRuntime().availableProcessors() - 1`,以4核双槽CPU为例,核心线程数量就是`4*2-1=7`个。这样的设置满足CPU密集型的应用,但对于业务都是IO密集型的应用来说,是有风险的,当qps较高时,线程数量可能就设的太少了,会导致线上故障。所以可以根据业务情况自定义线程池使用。
- **get设置超时时间不能串行get,不然会导致接口延时`线程数量\*超时时间`**
**① 创建异步任务**
通常可以使用下面几个CompletableFuture的静态方法创建一个异步任务
```java
// 创建无返回值的异步任务
public static CompletableFuture runAsync(Runnable runnable);
// 无返回值,可指定线程池(默认使用ForkJoinPool.commonPool)
public static CompletableFuture runAsync(Runnable runnable, Executor executor);
// 创建有返回值的异步任务
public static CompletableFuture supplyAsync(Supplier supplier);
// 有返回值,可指定线程池
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
```
使用示例:
```java
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture future = CompletableFuture.runAsync(() -> {
//do something
}, executor);
int poiId = 111;
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
PoiDTO poi = poiService.loadById(poiId);
return poi.getName();
});
// Block and get the result of the Future
String poiName = future.get();
```
**② 使用回调方法**
通过`future.get()`方法获取异步任务的结果,还是会阻塞的等待任务完成
CompletableFuture提供了几个回调方法,可以不阻塞主线程,在异步任务完成后自动执行回调方法中的代码
```java
// 无参数、无返回值
public CompletableFuture thenRun(Runnable runnable);
// 接受参数,无返回值
public CompletableFuture thenAccept(Consumer super T> action);
// 接受参数T,有返回值U
public CompletableFuture thenApply(Function super T,? extends U> fn);
```
使用示例:
```java
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("do other things. 比如异步打印日志或发送消息"));
// 如果只想在一个CompletableFuture任务执行完后,进行一些后续的处理,不需要返回值,那么可以用thenRun回调方法来完成。
// 如果主线程不依赖thenRun中的代码执行完成,也不需要使用get()方法阻塞主线程。
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept((s) -> System.out.println(s + " world"));
// 输出:Hello world
// 回调方法希望使用异步任务的结果,并不需要返回值,那么可以使用thenAccept方法
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
PoiDTO poi = poiService.loadById(poiId);
return poi.getMainCategory();
}).thenApply((s) -> isMainPoi(s)); // boolean isMainPoi(int poiId);
future.get();
// 希望将异步任务的结果做进一步处理,并需要返回值,则使用thenApply方法。
// 如果主线程要获取回调方法的返回,还是要用get()方法阻塞得到
```
**③ 组合两个异步任务**
```java
// thenCompose方法中的异步任务依赖调用该方法的异步任务
public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
// 用于两个独立的异步任务都完成的时候
public CompletableFuture thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn);
```
使用示例:
```java
CompletableFuture> poiFuture = CompletableFuture.supplyAsync(
() -> poiService.queryPoiIds(cityId, poiId)
);
// 第二个任务是返回CompletableFuture的异步方法
CompletableFuture> getDeal(List poiIds){
return CompletableFuture.supplyAsync(() -> poiService.queryPoiIds(poiIds));
}
// thenCompose
CompletableFuture> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds));
resultFuture.get();
```
thenCompose和thenApply的功能类似,两者区别在于thenCompose接受一个返回`CompletableFuture`的Function,当想从回调方法返回的`CompletableFuture`中直接获取结果U时,就用thenCompose。如果使用thenApply,返回结果resultFuture的类型是`CompletableFuture>>`,而不是`CompletableFuture>`
```java
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2);
// future.get()
```
**④ 组合多个CompletableFuture**
当需要多个异步任务都完成时,再进行后续处理,可以使用**allOf**方法:
```java
CompletableFuture poiIDTOFuture = CompletableFuture
.supplyAsync(() -> poiService.loadPoi(poiId))
.thenAccept(poi -> {
model.setModelTitle(poi.getShopName());
// do more thing
});
CompletableFuture productFuture = CompletableFuture
.supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId))
.thenAccept(list -> {
model.setDefaultCount(list.size());
model.setMoreDesc("more");
});
// future3等更多异步任务,这里就不一一写出来了
// allOf组合所有异步任务,并使用join获取结果
CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join();
```
该方法挺适合C端的业务,通过poiId异步的从多个服务拿门店信息,然后组装成自己需要的模型,最后所有门店信息都填充完后返回。这里使用了join方法获取结果,它和get方法一样阻塞的等待任务完成。多个异步任务有任意一个完成时就返回结果,可以使用**anyOf**方法:
```java
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
return "Result of Future 3";
});
CompletableFuture