Merge remote-tracking branch 'remotes/other_master/master'

pull/47/head
huifer 5 years ago
commit 56e1c718fb

@ -143,9 +143,11 @@
- 努力编写中...
## 番外篇JDK 1.8
- [HashMap 源码赏析]()
- [ConcurrentHashMap 源码赏析]()
- [String 源码赏析]()
- [Executor 线程池组件](docs/JDK/Executor线程池组件.md)
- [Lock 锁组件](docs/JDK/Lock锁组件.md)
- [HashMap 源码赏析](docs/JDK/HashMap.md)
- [ConcurrentHashMap 源码赏析](docs/JDK/ConcurrentHashMap.md)
- [String 源码赏析](docs/JDK/String.md)
## 学习心得
### 个人经验

@ -0,0 +1,252 @@
## 线程池核心组件图解
看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。
![avatar](/images/JDK1.8/线程池组件类图.png)
该组件中Executor 和 ExecutorService接口 定义了线程池最核心的几个方法提交任务submit
()、关闭线程池shutdown()。抽象类 AbstractExecutorService 主要对公共行为 submit()系列方法进行了实现,这些 submit()方法 的实现使用了 模板方法模式,其中调用的 execute()方法 是未实现的 来自 Executor接口 的方法。实现类 ThreadPoolExecutor 则对线程池进行了具体而复杂的实现。
另外还有一个常见的工具类 Executors里面为开发者封装了一些可以直接拿来用的线程池。
## 源码赏析
话不多说,直接上源码。(这里只看最主要的代码部分)
### Executor 和 ExecutorService接口
```java
public interface Executor {
/**
* 在将来的某个时间执行给定的 Runnable。该 Runnable 可以在新线程、池线程或调用线程中执行。
*/
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
/**
* 优雅关闭,该关闭会继续执行完以前提交的任务,但不再接受新任务。
*/
void shutdown();
/**
* 提交一个有返回值的任务,并返回该任务的 未来执行完成后的结果。
* Future的 get()方法 将在成功完成后返回任务的结果。
*/
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
```
### AbstractExecutorService 抽象类
```java
/**
* 该抽象类最主要的内容就是,实现了 ExecutorService 中的 submit()系列方法
*/
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 提交任务 进行执行,返回获取未来结果的 Future对象。
* 这里使用了 “模板方法模式”execute()方法来自 Executor接口该抽象类中并未进行实现
* 而是交由子类具体实现。
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
```
### ThreadPoolExecutor
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* **************
* ** 主要属性 **
* **************
*/
/** 阻塞队列 */
private final BlockingQueue<Runnable> workQueue;
/** 用于创建线程的 线程工厂 */
private volatile ThreadFactory threadFactory;
/** 核心线程数 */
private volatile int corePoolSize;
/** 最大线程数 */
private volatile int maximumPoolSize;
/**
* **************
* ** 构造方法 **
* **************
*/
/** 最后都使用了最后一个构造方法的实现 */
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* **************
* ** 主要实现 **
* **************
*/
/** 执行 Runnable任务 */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 分三步进行:
*
* 1、如果运行的线程少于 corePoolSize尝试开启一个新的线程否则尝试进入工作队列
*
* 2. 如果工作队列没满,则进入工作队列;否则 判断是否超出最大线程数
*
* 3. 如果未超出最大线程数,则尝试开启一个新的线程;否则 按饱和策略处理无法执行的任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* 优雅关闭,在其中执行以前提交的任务,但不接受新任务。如果已关闭,则调用没有其他效果。
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
}
```
ThreadPoolExecutor 中的 execute()方法 执行 Runnable任务 的流程逻辑可以用下图表示。
![avatar](/images/ConcurrentProgramming/线程池流程.png)
### 工具类 Executors
看类名也知道,它最主要的作用就是提供 static 的工具方法,为开发者提供各种封装好的 具有各自特性的线程池。
```java
public class Executors {
/**
* 创建一个固定线程数量的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 创建一个单线程的线程池
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 创建一个缓存的,可动态伸缩的线程池。
* 可以看出来核心线程数为0最大线程数为Integer.MAX_VALUE如果任务数在某一瞬间暴涨
* 这个线程池很可能会把 服务器撑爆。
* 另外需要注意的是,它们底层都是使用了 ThreadPoolExecutor只不过帮我们配好了参数
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
```

@ -0,0 +1,648 @@
## 类图结构
J.U.C 的锁组件中 类相对较少从JDK相应的包中也能看出来下图标记了其中最主要的几个接口和类也是本文要分析的重点。
![avatar](/images/JDK1.8/JUC的locks包.png)
下图 将这几个接口和类 以类图的方式展现出来,其中包含了它们所声明的主要方法。
![avatar](/images/JDK1.8/JUC锁组件类图.png)
## Lock 组件
Lock 组件的结构很简单,只有一个接口和一个实现类,源码如下。
```java
public interface Lock {
/**
* 获取锁
*/
void lock();
/**
* 获取锁,除非当前线程中断
*/
void lockInterruptibly() throws InterruptedException;
/**
* 只有当调用时 锁是空闲的情况下,才获取锁
*/
boolean tryLock();
/**
* 如果锁在给定的等待时间内空闲且当前线程未被中断,则获取该锁
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
}
public class ReentrantLock implements Lock, java.io.Serializable {
/** 提供所有实现机制的同步器ReentrantLock 的主要方法都依赖于该对象进行实现 */
private final Sync sync;
/**
* ReentrantLock锁 的同步控制基础。它的两个子类分别实现了公平锁和非公平锁,如下。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
final boolean isLocked() {
return getState() != 0;
}
}
/**
* 非公平锁,基于上面的 Sync类
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁,基于上面的 Sync类
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
/**
* 无参初始化时,默认实例化 非公平锁
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 可通过参数fair 控制实例化的是 公平锁还是非公平锁
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isLocked();
}
public final boolean isFair() {
return sync instanceof FairSync;
}
}
```
## ReadWriteLock 组件
ReadWriteLock 组件的结构也很简单,与上面的 Lock组件 不同的是,它提供了 公平的读锁写锁,以及非公平的读锁写锁。
```java
public interface ReadWriteLock {
/**
* 获取一个 读锁
*/
Lock readLock();
/**
* 获取一个 写锁
*/
Lock writeLock();
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
/** 由内部类提供的读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 由内部类提供的写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 提供所有实现机制的同步器 */
final Sync sync;
/**
* 默认创建 非公平的读锁写锁
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 由参数 fair 指定读锁写锁是公平的还是非公平的
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/**
* 获取写锁
* 获取读锁
*/
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
/**
* Performs tryLock for write, enabling barging in both modes.
* This is identical in effect to tryAcquire except for lack
* of calls to writerShouldBlock.
*/
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
/**
* Performs tryLock for read, enabling barging in both modes.
* This is identical in effect to tryAcquireShared except for
* lack of calls to readerShouldBlock.
*/
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
}
/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
/**
* 公平锁
*/
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
/**
* 读锁
*/
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
}
/**
* 写锁
*/
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
}
public final boolean isFair() {
return sync instanceof FairSync;
}
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
}
```
## AbstractQueuedSynchronizer
最后看一下抽象类 AbstractQueuedSynchronizer在同步组件的实现中AQS是核心部分同步组件的实现者通过使用 AQS 提供的模板方法实现同步组件语义AQS 则实现了对同步状态的管理以及对阻塞线程进行排队等待通知等等一些底层的实现处理。AQS 的核心包括同步队列独占式锁的获取和释放共享锁的获取和释放以及可中断锁超时等待锁获取这些特性的实现而这些实际上则是AQS提供出来的模板方法。源码如下。
```java
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。
* 就数据结构而言,队列的实现方式无外乎两者一是通过数组的形式,另外一种则是链表的形式。
* AQS中的同步队列则是通过链式方式进行实现下面的内部类Node便是其实现的载体
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
// 节点从同步队列中取消
static final int CANCELLED = 1;
// 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,
// 使得后继节点的线程能够运行;
static final int SIGNAL = -1;
// 当前节点进入等待队列中
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取将会无条件传播下去
static final int PROPAGATE = -3;
// 节点状态
volatile int waitStatus;
// 当前节点/线程的前驱节点
volatile Node prev;
// 当前节点/线程的后驱节点
volatile Node next;
// 加入同步队列的线程引用
volatile Thread thread;
// 等待队列中的下一个节点
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
/**
* AQS实际上通过头尾指针来管理同步队列同时实现包括获取锁失败的线程进行入队
* 释放锁时对同步队列中的线程进行通知等核心方法。
*/
private transient volatile Node head;
private transient volatile Node tail;
/**
* 获取独占式锁
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 释放独占式锁
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 获取可中断式锁
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* 获取共享锁
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 释放共享锁
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
```

@ -168,7 +168,7 @@ public class UnpooledDataSource implements DataSource {
数据库连接池的设计思路一般为:
1. 连接池初始化时创建一定数量的连接,并添加到连接池中备用;
2. 当程序需要使用数据库连接时,从连接池中请求,用完后会将其返还给连接池,而不是直接关闭;
3. 连接池会控制总连接上限及空闲连接上线,如果连接池中的连接总数已达上限,且都被占用,后续的连接请求会进入阻塞队列等待,直到有连接可用;
3. 连接池会控制总连接上限及空闲连接上线,如果连接池中的连接总数已达上限,且都被占用,后续的连接请求会短暂阻塞后重新尝试获取连接,如此循环,直到有连接可用;
4. 如果连接池中空闲连接较多,已达到空闲连接上限,则返回的连接会被关闭掉,以降低系统开销。
PooledDataSource 实现了简易的数据库连接池功能,其创建数据库连接的功能依赖了上面的 UnpooledDataSource。

@ -12,129 +12,129 @@
上面介绍了使用 DataSourceTransactionManager 实现事务创建、提交和回滚的过程,基本上与单独使用 Connection 实现事务处理是一样的,也是通过设置 autoCommit属性调用 Connection 的 commit() 和 rollback()方法 来完成的。而我们在声明式事务处理中看到的那些事务处理属性,并不在 DataSourceTransactionManager 中完成,这和我们在前面分析中看到的是一致的。
![avatar](/images/springTransaction/PlatformTransactionManager组件的设计.png)
![avatar](/images/springTransaction/实现DataSourceTransactionManager的时序图.png)
```java
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
/** 持有 javax.sql.DataSource对象 */
private DataSource dataSource;
/**
* 这里是产生 Transaction对象 的地方,为 Transaction 的创建提供服务,对数据库而言,
* 事务工作是由 Connection 来完成的。这里把数据库的 Connection对象 放到了 ConnectionHolder 中,
* 然后封装到一个 DataSourceTransactionObject对象 中,在这个封装过程中增加了许多为事务处理服务的
* 控制数据
*/
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 获取与当前线程绑定的 数据库Connection这个 Connection 在第一个事务开始
// 的地方与线程绑定
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
/**
* 判断是否存在活跃的事务,由 ConnectionHolder 的 transactionActive属性 来控制
*/
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
}
/**
* 这里是处理事务开始的地方,在这里设置隔离级别,但忽略超时
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 这里是 数据库Connection 完成事务处理的重要配置,需要把 autoCommit属性 关掉
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 把当前的 数据库Connection 与线程绑定
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
DataSourceUtils.releaseConnection(con, this.dataSource);
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
/**
* 事务提交的具体实现
*/
@Override
protected void doCommit(DefaultTransactionStatus status) {
// 取得 Connection 以后通过Connection 进行提交
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
/**
* 事务提交的具体实现,通过 Connection对象 的 rollback()方法 实现
*/
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
/** 持有 javax.sql.DataSource对象 */
private DataSource dataSource;
/**
* 这里是产生 Transaction对象 的地方,为 Transaction 的创建提供服务,对数据库而言,
* 事务工作是由 Connection 来完成的。这里把数据库的 Connection对象 放到了 ConnectionHolder 中,
* 然后封装到一个 DataSourceTransactionObject对象 中,在这个封装过程中增加了许多为事务处理服务的
* 控制数据
*/
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 获取与当前线程绑定的 数据库Connection这个 Connection 在第一个事务开始
// 的地方与线程绑定
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
/**
* 判断是否存在活跃的事务,由 ConnectionHolder 的 transactionActive属性 来控制
*/
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
}
/**
* 这里是处理事务开始的地方,在这里设置隔离级别,但忽略超时
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 这里是 数据库Connection 完成事务处理的重要配置,需要把 autoCommit属性 关掉
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 把当前的 数据库Connection 与线程绑定
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
DataSourceUtils.releaseConnection(con, this.dataSource);
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
/**
* 事务提交的具体实现
*/
@Override
protected void doCommit(DefaultTransactionStatus status) {
// 取得 Connection 以后通过Connection 进行提交
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
/**
* 事务提交的具体实现,通过 Connection对象 的 rollback()方法 实现
*/
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
}
```
上面介绍了使用 DataSourceTransactionManager 实现事务创建、提交和回滚的过程,基本上与单独使用 Connection 实现事务处理是一样的,也是通过设置 autoCommit属性调用 Connection 的 commit() 和 rollback()方法 来完成的。看到这里,大家一定会觉得非常的熟悉。而我们在声明式事务处理中看到的那些事务处理属性,并不在 DataSourceTransactionManager 中完成,这和我们在前面分析中看到的是一致的。

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 13 KiB

After

Width:  |  Height:  |  Size: 9.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.6 KiB

After

Width:  |  Height:  |  Size: 7.1 KiB

Loading…
Cancel
Save