JAVA
Introduction:收纳技术相关的JAVA知识 `JUC`、`Thread`、`Lock`、`I/O` 等总结!
[TOC]
# J.U.C
## 并发特性
JAVA里面进行多线程通信的主要方式就是 `共享内存` 的方式,共享内存主要的关注点有两个:`可见性` 和 `有序性`。加上复合操作的 `原子性`,可以认为JAVA的线程安全性问题主要关注点有3个(JAVA内存模型JMM解决了可见性和有序性的问题,而锁解决了原子性的问题):`可见性`、`有序性`、`原子性`
- **原子性(Atomicity)**:在Java中原子性指的是一个或多个操作要么全部执行成功要么全部执行失败
- **有序性(Ordering)**:程序执行的顺序按照代码的先后顺序执行(处理器可能会对指令进行重排序)
- **可见性(Visibility)**:指在多线程环境下,当一个线程修改了某一个共享变量的值,其它线程能够立刻知道这个修改
**① 重排序**
指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。从JAVA源码到最终实际执行的指令序列,会经历下面3种重排序(主要流程):
![源码指令](images/JAVA/源码指令.png)
**指令重排序分类**
- 编译器重排序
- 编译器优化重排序:编译器在不改变单线程程序语义(as-if-serial )的前提下,可以重新安排语句的执行顺序
- 处理器重排序
- 内存系统重排序:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行
- 指令级并行重排序:现代处理器采用了指令级并行技术(Instruction Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对机器指令的执行顺序
**② 顺序一致性**
顺序一致性内存模型是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存模型作为参照。顺序一致性特征如下:
- 一个线程中的所有操作必须按照程序的顺序来执行
- (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性的内存模型中,每个操作必须原子执行并且立刻对所有线程可见
## Unsafe
Java不能直接访问操作系统底层,而是通过本地方法来访问。**Unsafe类提供了硬件级别的原子操作**,主要提供以下功能:
- **通过Unsafe类可以分配内存,可以释放内存**
类中提供的3个本地方法allocateMemory(申请)、reallocateMemory(扩展)、freeMemory(销毁)分别用于分配内存,扩充内存和释放内存,与C语言中的3个方法对应。
- **可以定位对象某字段的内存位置,也可以修改对象的字段值,即使它是私有的**
- 字段的定位
- 数组元素定位
- **挂起与恢复**
将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。整个并发框架中对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本pack方法,但最终都调用了Unsafe.park()方法。
- **CAS操作**
是通过compareAndSwapXXX方法实现的
## LockSupport
LockSupport 和 CAS 是Java并发包中很多并发工具控制机制的基础,它们底层其实都是依赖Unsafe实现。LockSupport 提供park()和unpark()方法实现阻塞线程和解除线程阻塞。
LockSupport和每个使用它的线程都与一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。
park()和unpark()不会有 `Thread.suspend` 和 `Thread.resume` 所可能引发的死锁问题,由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。
## CAS机制
CAS(`Compare And Swap`,即比较并交换),是解决多线程并行情况下使用锁造成性能损耗的一种机制。其原理是利用`sun.misc.Unsafe.java` 类通过JNI来调用硬件级别的原子操作来实现CAS(即CAS是借助C来调用CPU底层指令实现的)。
**CAS机制=比较并交换+乐观锁机制+锁自旋**
**设计思想**:如果`内存位置` 的值与 `预期原值` 相匹配,那么处理器会自动将该位置值更新为新值,否则处理器不做任何操作。
ReentrantLock、ReentrantReadWriteLock 都是基于 AbstractQueuedSynchronizer (AQS),而 AQS 又是基于 CAS。CAS 的全称是 Compare And Swap(比较与交换),它是一种无锁算法。synchronized和Lock都采用了悲观锁的机制,而CAS是一种乐观锁的实现。乐观锁的原理就是每次不加锁去执行某项操作,如果发生冲突则失败并重试,直到成功为止,其实本质上不算锁,所以很多地方也称之为**自旋**。乐观锁用到的主要机制就是**CAS**(Compare And Swap)。
**CAS特性**
- 通过JNI借助C来调用CPU底层指令实现
- 非阻塞算法
- 非独占锁
**CAS缺陷**
- **ABA问题**:X线程读到为A;Y线程立刻改为B,又改为A;X线程发现值还是A,此时CAS比较值相等,自旋成功
- 使用数据乐观锁的方式给它加一个版本号或者时间戳,如使用 `AtomicStampedReference` 解决
- **自旋消耗资源**:多个线程争夺同一个资源时,如果自旋一直不成功,将会一直占用CPU
- 破坏掉死循环,当超过一定时间或者一定次数时,return退出
- **只能保证一个共享变量的原子操作**
- 可以加锁来解决
- 封装成对象类解决,如使用 `AtomicReference` 解决
## AQS框架
**什么是AQS?**
`AQS` 的全称是 `AbstractQueuedSynchronizer`,即`抽象队列同步器`。是Java并发工具的基础,采用乐观锁,通过CAS与自旋轻量级的获取锁。维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。很多JUC包,比如ReentrantLock、Semaphore、CountDownLatch等并发类均是继承AQS,通过AQS的模板方法,来实现的。
![AQS](images/JAVA/AQS.png)
**核心思想**
- 若请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,并将共享资源设置为锁定状态
- 若共享资源被占用,则需要阻塞等待唤醒机制保证锁的分配
**工作原理**
**AQS = `同步状态(volatile int state)` + `同步队列(即等待队列,FIFO的CLH队列)` + `条件队列(ConditionObject)`**
- **state**:代表共享资源。`volatile` 保证并发读,`CAS` 保证并发写
- **同步队列(即等待队列,CLH队列)**:是CLH变体的虚拟双向队列(先进先出FIFO)来等待获取共享资源。当前线程可以通过signal和signalAll将条件队列中的节点转移到同步队列中
- **条件队列(ConditionObject)**:当前线程存在于同步队列的头节点,可以通过await从同步队列转移到条件队列中
**实现原理**
- 通过CLH队列的变体:FIFO双向队列实现的
- 每个请求资源的线程被包装成一个节点来实现锁的分配
- 通过`volatile`的`int`类型的成员变量`state`表示同步状态
- 通过FIFO队列完成资源获取的排队工作
- 通过CAS完成对`state`的修改
### 共享方式
AQS定义两种资源共享方式。无论是独占锁还是共享锁,本质上都是对AQS内部的一个变量`state`的获取。`state`是一个原子的int变量,用来表示锁状态、资源数等。
**① 独占锁(`Exclusive`)模式**:只能被一个线程获取到(`Reentrantlock`)。
![独占锁(Exclusive)模式](images/JAVA/独占锁(Exclusive)模式.png)
**② 共享锁(`Share`)模式**:可以被多个线程同时获取(`Semaphore/CountDownLatch/ReadWriteLock`)。
![共享锁(Share)模式](images/JAVA/共享锁(Share)模式.png)
### state机制
提供`volatile`变量`state`,用于同步线程之间的共享状态。通过 `CAS` 和 `volatile` 保证其原子性和可见性。核心要点:
- state 用 `volatile` 修饰,保证多线程中的可见性
- `getState()` 和 `setState()` 方法**采用final修饰**,限制AQS的子类重写它们两
- `compareAndSetState()` 方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写
**state应用案例**:
| 案例 | 描述 |
| ------------------------ | ------------------------------------------------------------ |
| `Semaphore` | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数 |
| `CountDownLatch` | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过 |
| `ReentrantReadWriteLock` | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数 |
| `ThreadPoolExecutor` | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease) |
| `ReentrantLock` | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理 |
### 双队列
![AQS同步队列与条件队列](images/JAVA/AQS同步队列与条件队列.png)
![AQS-Lock-Condition](images/JAVA/AQS-Lock-Condition.png)
- **同步队列(syncQueue):管理多个线程的休眠与唤醒**
- **条件队列(waitQueue):类似wait与signal作用,实现在使用锁时对线程管理**
**注意**
- 同步队列与条件队列节点可相互转化
- 一个线程只能存在于两个队列中的一个
#### 同步队列
**同步队列是用来管理多个线程的休眠与唤醒**。
同步队列依赖一个双向链表(CHL)来完成同步状态的管理,当前线程获取同步状态失败后,同步器会将线程构建成一个节点,并将其加入同步队列中。通过`signal`或`signalAll`将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
![同步队列(syncQueue)结构](images/JAVA/同步队列(syncQueue)结构.png)
如果没有锁竞争,线程可以直接获取到锁,就不会进入同步队列。即没有锁竞争时,同步队列(syncQueue)是空的,当存在锁竞争时,线程会进入到同步队列中。一旦进入到同步队列中,就会有线程切换。标准的 CHL 无锁队列是单向链表,同步队列(syncQueue) 在 CHL 基础上做了改进:
- **同步队列是双向链表**。和二叉树一样,双向链表目前也没有无锁算法的实现。双向链表需要同时设置前驱和后继结点,这两次操作只能保证一个是原子性的
- **node.pre一定可以遍历所有结点,是线程安全的**。而后继结点 node.next 则是线程不安全的。也就是说,node.pre 一定可以遍历整个链表,而 node.next 则不一定
#### 条件队列
**条件队列是类似wait与signal作用,实现在使用锁时对线程管理。且由于实现了Condition,对线程的管理可更加细化**。
当线程存在于同步队列的头结点时,调用 `await` 方法进行阻塞(从同步队列转化到条件队列)。Condition条件队列(`waitQueue`)要比Lock同步队列(`syncQueue`)简单很多,最重要的原因是 `waitQueue` 的操作都是在获取锁的线程中执行,不存在数据竞争的问题。
![Condition等待队列结构](images/JAVA/Condition等待队列结构.png)
ConditionObject重要的方法说明:
- **await**:阻塞线程并放弃锁,加入到等待队列中
- **signal**:唤醒等待线程,没有特殊的要求,尽量使用 signalAll
- **addConditionWaiter**:将结点(状态为 CONDITION)添加到等待队列 waitQueue 中,不存在锁竞争
- **fullyRelease**:释放锁,并唤醒后继等待线程
- **isOnSyncQueue**:根据结点是否在同步队列上,判断等待线程是否已经被唤醒
- **acquireQueued**:Lock 接口中的方法,通过同步队列方法竞争锁
- **unlinkCancelledWaiters**:清理取消等待的线程
### 框架架构图
![AQS框架架构图](images/JAVA/AQS框架架构图.png)
**常见问题**
**问题1:state为什么要提供setState和compareAndSetState两种修改状态的方法?**
这个问题,关键是修改状态时是否存在数据竞争,如果有则必须使用 compareAndSetState。
- lock.lock() 获取锁时会发生数据竞争,必须使用 CAS 来保障线程安全,也就是 compareAndSetState 方法
- lock.unlock() 释放锁时,线程已经获取到锁,没有数据竞争,也就可以直接使用 setState 修改锁的状态
**问题2:AQS为什么选择node.prev前驱结点的原子性,而node.next后继结点则是辅助结点?**
- next 域:需要修改二处来保证原子性,一是 tail.next;二是 tail 指针
- prev 域:只需要修改一处来保证原子性,就是 tail 指针。你可能会说不需要修改 node.prev 吗?当然需要,但 node 还没添加到链表中,其 node.prev 修改并没有锁竞争的问题,将 tail 指针指向 node 时,如果失败会通过自旋不断尝试
**问题3:AQS明知道node.next有可见性问题,为什么还要设计成双向链表?**
唤醒同步线程时,如果有后继结点,那么时间复杂为 O(1)。否则只能只反向遍历,时间复杂度为 O(n)。以下两种情况,则认为 node.next 不可靠,需要从 tail 反向遍历。
- node.next=null:可能结点刚刚插入链表中,node.next 仍为空。此时有其它线程通过 unparkSuccessor 来唤醒该线程
- node.next.waitStatus>0:结点已经取消,next 值可能已经改变
## Condition
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
## volatile
Java 语言提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。volatile 变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。
**volatile特性与原理**
- **可见性**:volatile修饰的变量,JVM保证了每次都跳过工作内存和缓存行(CPU Cache)来读取主内存中的最新值
- **有序性**:即JVM使用内存屏障来禁止了该变量操作的指令重排序优化
- **volatile性能**:volatile的读性能消耗与普通变量几乎相同,但是写操作稍慢,因为它需要在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行
- **内存屏障**:加入volatile关键字时,会多出一个lock前缀指令,lock前缀指令相当于一个内存屏障(也称内存栅栏)
- **轻量级同步机制**:Java提供了一种稍弱同步机制,即volatile变量,是一种比sychronized关键字更轻量级的同步机制
- **禁止重排序**:volatile 禁止了指令重排
**使用场景**
- 状态标记量
- DCL(Double Check Lock)
**CPU缓存**
CPU缓存的出现主要是为了解决CPU运算速度与内存读写速度不匹配的矛盾,因为CPU运算速度要比内存读写速度快得多。按照读取顺序与CPU结合的紧密程度,CPU缓存可分为:
- **一级缓存**:简称L1 Cache,位于CPU内核的旁边,是与CPU结合最为紧密的CPU缓存
- **二级缓存**:简称L2 Cache,分内部和外部两种芯片,内部芯片二级缓存运行速度与主频相同,外部芯片二级缓存运行速度则只有主频的一半
- **三级缓存**:简称L3 Cache,部分高端CPU才有
**volatile的特性**
- 保证了线程可见性,不保证原子性,保证一定的有序性(禁止指令重排序)
- 在JVM底层volatile是采用“内存屏障”来实现的
- volatile常用场景:状态标记、DCL(双重检查锁,Double Check)
- volatile不会引起线程上下文的切换和调度
- 基于lock前缀指令,相当于内存屏障(内存栅栏)
## lambda
### 函数式接口
函数接口是只有一个抽象方法的接口,用作 Lambda 表达式的类型。使用@FunctionalInterface注解修饰的类,编译器会检测该类是否只有一个抽象方法或接口,否则,会报错。可以有多个默认方法,静态方法。JAVA8自带的常用函数式接口:
| 函数接口 | 抽象方法 | 功能 | 参数 | 返回类型 | 示例 |
| -------------- | --------------- | ---------------------- | ----- | -------- | ------------------- |
| Predicate | test(T t) | 判断真假 | T | boolean | 身高大于185cm吗? |
| Consumer | accept(T t) | 消费消息 | T | void | 输出一个值 |
| Function | R apply(T t) | 将T映射为R(转换功能) | T | R | 取student对象的名字 |
| Supplier | T get() | 生产消息 | None | T | 工厂方法 |
| UnaryOperator | T apply(T t) | 一元操作 | T | T | 逻辑非(!) |
| BinaryOperator | apply(T t, U u) | 二元操作 | (T,T) | (T) | 求两个数的乘积(*) |
### 常用的流
#### collect
**将流转换为集合。有toList()、toSet()、toMap()等,及早求值**。
```java
public class TestCase {
public static void main(String[] args) {
List studentList = Stream.of(new Student("路飞", 22, 175),
new Student("红发", 40, 180), new Student("白胡子", 50, 185)).collect(Collectors.toList());
System.out.println(studentList);
}
}
// 输出结果
// [Student{name='路飞', age=22, stature=175, specialities=null},
// Student{name='红发', age=40, stature=180, specialities=null},
// Student{name='白胡子', age=50, stature=185, specialities=null}]
```
#### filter
顾名思义,起**过滤筛选**的作用。**内部就是Predicate接口。惰性求值。**
![lambda-filter](images/JAVA/lambda-filter.jpg)
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
List list = students.stream()
.filter(stu -> stu.getStature() < 180)
.collect(Collectors.toList());
System.out.println(list);
}
}
// 输出结果
// [Student{name='路飞', age=22, stature=175, specialities=null}]
```
#### map
**转换功能,内部就是Function接口。惰性求值。**
![lambda-map](images/JAVA/lambda-map.jpg)
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
List names = students.stream().map(student -> student.getName())
.collect(Collectors.toList());
System.out.println(names);
}
}
// 输出结果
// [路飞, 红发, 白胡子]
```
例子中将student对象转换为String对象,获取student的名字。
#### flatMap
**将多个Stream合并为一个Stream。惰性求值。**
![lambda-flatMap](images/JAVA/lambda-flatMap.jpg)
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
List studentList = Stream.of(students,
asList(new Student("艾斯", 25, 183),
new Student("雷利", 48, 176)))
.flatMap(students1 -> students1.stream()).collect(Collectors.toList());
System.out.println(studentList);
}
}
// 输出结果
// [Student{name='路飞', age=22, stature=175, specialities=null},
// Student{name='红发', age=40, stature=180, specialities=null},
// Student{name='白胡子', age=50, stature=185, specialities=null},
// Student{name='艾斯', age=25, stature=183, specialities=null},
// Student{name='雷利', age=48, stature=176, specialities=null}]
```
调用Stream.of的静态方法将两个list转换为Stream,再通过flatMap将两个流合并为一个。
#### max和min
我们经常会在集合中**求最大或最小值**,使用流就很方便。**及早求值。**
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
Optional max = students.stream()
.max(Comparator.comparing(stu -> stu.getAge()));
Optional min = students.stream()
.min(Comparator.comparing(stu -> stu.getAge()));
//判断是否有值
if (max.isPresent()) {
System.out.println(max.get());
}
if (min.isPresent()) {
System.out.println(min.get());
}
}
}
// 输出结果
// Student{name='白胡子', age=50, stature=185, specialities=null}
// Student{name='路飞', age=22, stature=175, specialities=null}
```
**max、min接收一个Comparator**(例子中使用java8自带的静态函数,只需要传进需要比较值即可),并且返回一个Optional对象,该对象是java8新增的类,专门为了防止null引发的空指针异常。可以使用max.isPresent()判断是否有值;可以使用max.orElse(new Student()),当值为null时就使用给定值;也可以使用max.orElseGet(() -> new Student());这需要传入一个Supplier的lambda表达式。
#### count
**统计功能,一般都是结合filter使用,因为先筛选出我们需要的再统计即可。及早求值。**
```java
public class TestCase {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
long count = students.stream().filter(s1 -> s1.getAge() < 45).count();
System.out.println("年龄小于45岁的人数是:" + count);
}
}
// 输出结果
// 年龄小于45岁的人数是:2
```
#### reduce
**reduce 操作可以实现从一组值中生成一个值**。在上述例子中用到的 count 、 min 和 max 方法,因为常用而被纳入标准库中。事实上,这些方法都是 reduce 操作。**及早求值。**
![lambda-reduce](images/JAVA/lambda-reduce.jpg)
```java
public class TestCase {
public static void main(String[] args) {
Integer reduce = Stream.of(1, 2, 3, 4).reduce(0, (acc, x) -> acc+ x);
System.out.println(reduce);
}
}
// 输出结果
// 10
```
我们看得reduce接收了一个初始值为0的累加器,依次取出值与累加器相加,最后累加器的值就是最终的结果。
### 高级集合类及收集器
#### 转换成值
**收集器,一种通用的、从流生成复杂值的结构。**只要将它传给 collect 方法,所有的流就都可以使用它了。标准类库已经提供了一些有用的收集器,**以下示例代码中的收集器都是从 java.util.stream.Collectors 类中静态导入的。**
```java
public class CollectorsTest {
public static void main(String[] args) {
List students1 = new ArrayList<>(3);
students1.add(new Student("路飞", 23, 175));
students1.add(new Student("红发", 40, 180));
students1.add(new Student("白胡子", 50, 185));
OutstandingClass ostClass1 = new OutstandingClass("一班", students1);
//复制students1,并移除一个学生
List students2 = new ArrayList<>(students1);
students2.remove(1);
OutstandingClass ostClass2 = new OutstandingClass("二班", students2);
//将ostClass1、ostClass2转换为Stream
Stream classStream = Stream.of(ostClass1, ostClass2);
OutstandingClass outstandingClass = biggestGroup(classStream);
System.out.println("人数最多的班级是:" + outstandingClass.getName());
System.out.println("一班平均年龄是:" + averageNumberOfStudent(students1));
}
/**
* 获取人数最多的班级
*/
private static OutstandingClass biggestGroup(Stream outstandingClasses) {
return outstandingClasses.collect(
maxBy(comparing(ostClass -> ostClass.getStudents().size())))
.orElseGet(OutstandingClass::new);
}
/**
* 计算平均年龄
*/
private static double averageNumberOfStudent(List students) {
return students.stream().collect(averagingInt(Student::getAge));
}
}
// 输出结果
// 人数最多的班级是:一班
// 一班平均年龄是:37.666666666666664
```
maxBy或者minBy就是求最大值与最小值。
#### 转换成块
**常用的流操作是将其分解成两个集合,Collectors.partitioningBy帮我们实现了,接收一个Predicate函数式接口。**
![lambda-partitioningBy](images/JAVA/lambda-partitioningBy.jpg)
将示例学生分为会唱歌与不会唱歌的两个集合。
```java
public class PartitioningByTest {
public static void main(String[] args) {
// 省略List students的初始化
Map> listMap = students.stream().collect(
Collectors.partitioningBy(student -> student.getSpecialities().
contains(SpecialityEnum.SING)));
}
}
```
#### 数据分组
数据分组是一种更自然的分割数据操作,与将数据分成 ture 和 false 两部分不同,**可以使用任意值对数据分组。Collectors.groupingBy接收一个Function做转换。**
![lambda-groupingBy](images/JAVA/lambda-groupingBy.jpg)
**如图,使用groupingBy将根据进行分组为圆形一组,三角形一组,正方形一组。**例子:根据学生第一个特长进行分组
```java
public class GroupingByTest {
public static void main(String[] args) {
//省略List students的初始化
Map> listMap =
students.stream().collect(
Collectors.groupingBy(student -> student.getSpecialities().get(0)));
}
}
```
Collectors.groupingBy与SQL 中的 group by 操作是一样的。
#### 字符串拼接
如果将所有学生的名字拼接起来,怎么做呢?通常只能创建一个StringBuilder,循环拼接。使用Stream,使用Collectors.joining()简单容易。
```java
public class JoiningTest {
public static void main(String[] args) {
List students = new ArrayList<>(3);
students.add(new Student("路飞", 22, 175));
students.add(new Student("红发", 40, 180));
students.add(new Student("白胡子", 50, 185));
String names = students.stream()
.map(Student::getName).collect(Collectors.joining(",","[","]"));
System.out.println(names);
}
}
//输出结果
//[路飞,红发,白胡子]
```
joining接收三个参数,第一个是分界符,第二个是前缀符,第三个是结束符。也可以不传入参数Collectors.joining(),这样就是直接拼接。
## Striped64
Striped64的设计思路是在竞争激烈的时候尽量分散竞争。
**Striping(条带化)**
大多数磁盘系统都对访问次数(每秒的 I/O 操作,IOPS)和数据传输率(每秒传输的数据量,TPS)有限制。当达到这些限制时,后续需要访问磁盘的进程就需要等待,这就是所谓的磁盘冲突。当多个进程同时访问一个磁盘时,可能会出现磁盘冲突。因此,避免磁盘冲突是优化 I/O 性能的一个重要目标。
条带(strip)是把连续的数据分割成相同大小的数据块,把每段数据分别写入到阵列中的不同磁盘上的方法。使用条带化技术使得多个进程同时访问数据的多个不同部分而不会造成磁盘冲突,而且在需要对这种数据进行顺序访问的时候可以获得最大程度上的 I/O 并行能力,从而获得非常好的性能。
**Striped64设计**
Striped64通过维护一个原子更新Cell表和一个base字段,并使用每个线程的探针字段作为哈希码映射到表的指定Cell。当竞争激烈时,将多线程的更新分散到不同Cell进行,有效降低了高并发下CAS更新的竞争,从而最大限度地提高了Striped64的吞吐量。Striped64为实现高吞吐量的并发计数组件奠定了基础,其中LongAdder就是基于Striped64实现,此外Java8中ConcurrentHashMap实现的并发计数功能也是基于Striped64的设计理念,还有hystrix、guava等实现的并发计数组件也离不开Striped64。
## LongAdder
LongAdder是JDK1.8开始出现的,所提供的API基本上可替换掉原先AtomicLong。LongAdder所使用思想就是**热点分离**,这点可以类比一下ConcurrentHashMap的设计思想。就是将value值分离成一个数组,当多线程访问时,通过hash算法映射到其中的一个数字进行计数。而最终的结果,就是这些数组的求和累加。这样一来,就减小了锁的粒度。如下图所示:
![LongAdder原理](images/JAVA/LongAdder原理.png)
LonAdder和AtomicLong性能测试对比:
![LongAdder和AtomicLong性能对比](images/JAVA/LongAdder和AtomicLong性能对比.png)
LongAdder就是基于Striped64实现,用于并发计数时,若不存在竞争或竞争比较低时,LongAdder具有和AtomicLong差不多的效率。但是,高并发环境下,竞争比较严重时,LongAdder的cells表发挥作用,将并发更新分散到不同Cell进行,有效降低了CAS更新的竞争,从而极大提高了LongAdder的并发计数能力。因此,高并发场景下,要实现吞吐量更高的计数器,推荐使用LongAdder。
## Semaphore
Semaphore是一个计数信号量,它的本质是一个"共享锁"。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
**数据结构**
![Semaphore数据结构](images/JAVA/Semaphore数据结构.jpg)
- 和"ReentrantLock"一样,Semaphore也包含sync对象,sync是Sync类型;而且Sync是一个继承于AQS的抽象类
- Sync包括两个子类:"公平信号量"FairSync 和 "非公平信号量"NonfairSync。sync是"FairSync的实例",或者"NonfairSync的实例";默认情况下,sync是NonfairSync(即,默认是非公平信号量)
## CyclicBarrier
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
**比较CountDownLatch和CyclicBarrier**
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
**数据结构**
![CyclicBarrier数据结构](images/JAVA/CyclicBarrier数据结构.jpg)
CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。
## CountDownLatch
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
**CountDownLatch和CyclicBarrier的区别**
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
**数据结构**
![CountDownLatch数据结构](images/JAVA/CountDownLatch数据结构.jpg)
CountDownLatch的数据结构很简单,它是通过"共享锁"实现的。它包含了sync对象,sync是Sync类型。Sync是实例类,它继承于AQS。
## CompletableFuture
CompletableFuture是Java 8新增的一个类,用于异步编程,继承了Future和CompletionStage。Future主要具备对请求结果独立处理的功能,CompletionStage用于实现流式处理,实现异步请求的各个阶段组合或链式处理,因此CompletableFuture能实现整个异步调用接口的扁平化和流式处理,解决原有Future处理一系列链式异步请求时的复杂编码:
![img](images/JAVA/CompletableFuture.jpg)
**Future的局限性**
- **Future 的结果在非阻塞的情况下,不能执行更进一步的操作**
我们知道,使用Future时只能通过isDone()方法判断任务是否完成,或者通过get()方法阻塞线程等待结果返回,它不能非阻塞的情况下,执行更进一步的操作
- **不能组合多个Future的结果**
假设你有多个Future异步任务,你希望最快的任务执行完时,或者所有任务都执行完后,进行一些其他操作
- **多个Future不能组成链式调用**
当异步任务之间有依赖关系时,Future不能将一个任务的结果传给另一个异步任务,多个Future无法创建链式的工作流
- **没有异常处理**
**注意事项**
- **CompletableFuture默认线程池是否满足使用**
前面提到创建CompletableFuture异步任务的静态方法runAsync和supplyAsync等,可以指定使用的线程池,不指定则用CompletableFuture的默认线程池:
```java
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
```
可以看到,CompletableFuture默认线程池是调用ForkJoinPool的commonPool()方法创建,这个默认线程池的核心线程数量根据CPU核数而定,公式为`Runtime.getRuntime().availableProcessors() - 1`,以4核双槽CPU为例,核心线程数量就是`4*2-1=7`个。这样的设置满足CPU密集型的应用,但对于业务都是IO密集型的应用来说,是有风险的,当qps较高时,线程数量可能就设的太少了,会导致线上故障。所以可以根据业务情况自定义线程池使用。
- **get设置超时时间不能串行get,不然会导致接口延时`线程数量\*超时时间`**
**① 创建异步任务**
通常可以使用下面几个CompletableFuture的静态方法创建一个异步任务
```java
// 创建无返回值的异步任务
public static CompletableFuture runAsync(Runnable runnable);
// 无返回值,可指定线程池(默认使用ForkJoinPool.commonPool)
public static CompletableFuture runAsync(Runnable runnable, Executor executor);
// 创建有返回值的异步任务
public static CompletableFuture supplyAsync(Supplier supplier);
// 有返回值,可指定线程池
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
```
使用示例:
```java
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture future = CompletableFuture.runAsync(() -> {
//do something
}, executor);
int poiId = 111;
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
PoiDTO poi = poiService.loadById(poiId);
return poi.getName();
});
// Block and get the result of the Future
String poiName = future.get();
```
**② 使用回调方法**
通过`future.get()`方法获取异步任务的结果,还是会阻塞的等待任务完成
CompletableFuture提供了几个回调方法,可以不阻塞主线程,在异步任务完成后自动执行回调方法中的代码
```java
// 无参数、无返回值
public CompletableFuture thenRun(Runnable runnable);
// 接受参数,无返回值
public CompletableFuture thenAccept(Consumer super T> action);
// 接受参数T,有返回值U
public CompletableFuture thenApply(Function super T,? extends U> fn);
```
使用示例:
```java
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("do other things. 比如异步打印日志或发送消息"));
// 如果只想在一个CompletableFuture任务执行完后,进行一些后续的处理,不需要返回值,那么可以用thenRun回调方法来完成。
// 如果主线程不依赖thenRun中的代码执行完成,也不需要使用get()方法阻塞主线程。
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept((s) -> System.out.println(s + " world"));
// 输出:Hello world
// 回调方法希望使用异步任务的结果,并不需要返回值,那么可以使用thenAccept方法
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
PoiDTO poi = poiService.loadById(poiId);
return poi.getMainCategory();
}).thenApply((s) -> isMainPoi(s)); // boolean isMainPoi(int poiId);
future.get();
// 希望将异步任务的结果做进一步处理,并需要返回值,则使用thenApply方法。
// 如果主线程要获取回调方法的返回,还是要用get()方法阻塞得到
```
**③ 组合两个异步任务**
```java
// thenCompose方法中的异步任务依赖调用该方法的异步任务
public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
// 用于两个独立的异步任务都完成的时候
public CompletableFuture thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn);
```
使用示例:
```java
CompletableFuture> poiFuture = CompletableFuture.supplyAsync(
() -> poiService.queryPoiIds(cityId, poiId)
);
// 第二个任务是返回CompletableFuture的异步方法
CompletableFuture> getDeal(List poiIds){
return CompletableFuture.supplyAsync(() -> poiService.queryPoiIds(poiIds));
}
// thenCompose
CompletableFuture> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds));
resultFuture.get();
```
thenCompose和thenApply的功能类似,两者区别在于thenCompose接受一个返回`CompletableFuture`的Function,当想从回调方法返回的`CompletableFuture`中直接获取结果U时,就用thenCompose。如果使用thenApply,返回结果resultFuture的类型是`CompletableFuture>>`,而不是`CompletableFuture>`
```java
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2);
// future.get()
```
**④ 组合多个CompletableFuture**
当需要多个异步任务都完成时,再进行后续处理,可以使用**allOf**方法:
```java
CompletableFuture poiIDTOFuture = CompletableFuture
.supplyAsync(() -> poiService.loadPoi(poiId))
.thenAccept(poi -> {
model.setModelTitle(poi.getShopName());
// do more thing
});
CompletableFuture productFuture = CompletableFuture
.supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId))
.thenAccept(list -> {
model.setDefaultCount(list.size());
model.setMoreDesc("more");
});
// future3等更多异步任务,这里就不一一写出来了
// allOf组合所有异步任务,并使用join获取结果
CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join();
```
该方法挺适合C端的业务,通过poiId异步的从多个服务拿门店信息,然后组装成自己需要的模型,最后所有门店信息都填充完后返回。这里使用了join方法获取结果,它和get方法一样阻塞的等待任务完成。多个异步任务有任意一个完成时就返回结果,可以使用**anyOf**方法:
```java
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
return "Result of Future 3";
});
CompletableFuture