diff --git a/README.md b/README.md index 784953f..1c31cff 100644 --- a/README.md +++ b/README.md @@ -143,9 +143,11 @@ - 努力编写中... ## 番外篇(JDK 1.8) -- [HashMap 源码赏析]() -- [ConcurrentHashMap 源码赏析]() -- [String 源码赏析]() +- [Executor 线程池组件](docs/JDK/Executor线程池组件.md) +- [Lock 锁组件](docs/JDK/Lock锁组件.md) +- [HashMap 源码赏析](docs/JDK/HashMap.md) +- [ConcurrentHashMap 源码赏析](docs/JDK/ConcurrentHashMap.md) +- [String 源码赏析](docs/JDK/String.md) ## 学习心得 ### 个人经验 diff --git a/docs/JDK/Executor线程池组件.md b/docs/JDK/Executor线程池组件.md new file mode 100644 index 0000000..02df792 --- /dev/null +++ b/docs/JDK/Executor线程池组件.md @@ -0,0 +1,252 @@ +## 线程池核心组件图解 +看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。 + +![avatar](/images/JDK1.8/线程池组件类图.png) + +该组件中,Executor 和 ExecutorService接口 定义了线程池最核心的几个方法,提交任务submit +()、关闭线程池shutdown()。抽象类 AbstractExecutorService 主要对公共行为 submit()系列方法进行了实现,这些 submit()方法 的实现使用了 模板方法模式,其中调用的 execute()方法 是未实现的 来自 Executor接口 的方法。实现类 ThreadPoolExecutor 则对线程池进行了具体而复杂的实现。 + +另外还有一个常见的工具类 Executors,里面为开发者封装了一些可以直接拿来用的线程池。 + +## 源码赏析 +话不多说,直接上源码。(这里只看最主要的代码部分) + +### Executor 和 ExecutorService接口 +```java +public interface Executor { + + /** + * 在将来的某个时间执行给定的 Runnable。该 Runnable 可以在新线程、池线程或调用线程中执行。 + */ + void execute(Runnable command); +} + +public interface ExecutorService extends Executor { + + /** + * 优雅关闭,该关闭会继续执行完以前提交的任务,但不再接受新任务。 + */ + void shutdown(); + + /** + * 提交一个有返回值的任务,并返回该任务的 未来执行完成后的结果。 + * Future的 get()方法 将在成功完成后返回任务的结果。 + */ + Future submit(Callable task); + + Future submit(Runnable task, T result); + + Future submit(Runnable task); +} +``` +### AbstractExecutorService 抽象类 +```java +/** + * 该抽象类最主要的内容就是,实现了 ExecutorService 中的 submit()系列方法 + */ +public abstract class AbstractExecutorService implements ExecutorService { + + /** + * 提交任务 进行执行,返回获取未来结果的 Future对象。 + * 这里使用了 “模板方法模式”,execute()方法来自 Executor接口,该抽象类中并未进行实现, + * 而是交由子类具体实现。 + */ + public Future submit(Runnable task) { + if (task == null) throw new NullPointerException(); + RunnableFuture ftask = newTaskFor(task, null); + execute(ftask); + return ftask; + } + + public Future submit(Runnable task, T result) { + if (task == null) throw new NullPointerException(); + RunnableFuture ftask = newTaskFor(task, result); + execute(ftask); + return ftask; + } + + public Future submit(Callable task) { + if (task == null) throw new NullPointerException(); + RunnableFuture ftask = newTaskFor(task); + execute(ftask); + return ftask; + } +} +``` + +### ThreadPoolExecutor +```java +public class ThreadPoolExecutor extends AbstractExecutorService { + + /** + * ************** + * ** 主要属性 ** + * ************** + */ + + /** 阻塞队列 */ + private final BlockingQueue workQueue; + + /** 用于创建线程的 线程工厂 */ + private volatile ThreadFactory threadFactory; + + /** 核心线程数 */ + private volatile int corePoolSize; + + /** 最大线程数 */ + private volatile int maximumPoolSize; + + + /** + * ************** + * ** 构造方法 ** + * ************** + */ + + /** 最后都使用了最后一个构造方法的实现 */ + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); + } + + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, defaultHandler); + } + + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), handler); + } + + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } + + /** + * ************** + * ** 主要实现 ** + * ************** + */ + + /** 执行 Runnable任务 */ + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + /* + * 分三步进行: + * + * 1、如果运行的线程少于 corePoolSize,尝试开启一个新的线程;否则尝试进入工作队列 + * + * 2. 如果工作队列没满,则进入工作队列;否则 判断是否超出最大线程数 + * + * 3. 如果未超出最大线程数,则尝试开启一个新的线程;否则 按饱和策略处理无法执行的任务 + */ + int c = ctl.get(); + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) + reject(command); + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + else if (!addWorker(command, false)) + reject(command); + } + + /** + * 优雅关闭,在其中执行以前提交的任务,但不接受新任务。如果已关闭,则调用没有其他效果。 + */ + public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + advanceRunState(SHUTDOWN); + interruptIdleWorkers(); + onShutdown(); // hook for ScheduledThreadPoolExecutor + } finally { + mainLock.unlock(); + } + tryTerminate(); + } +} +``` +ThreadPoolExecutor 中的 execute()方法 执行 Runnable任务 的流程逻辑可以用下图表示。 + +![avatar](/images/ConcurrentProgramming/线程池流程.png) + +### 工具类 Executors +看类名也知道,它最主要的作用就是提供 static 的工具方法,为开发者提供各种封装好的 具有各自特性的线程池。 +```java +public class Executors { + + /** + * 创建一个固定线程数量的线程池 + */ + public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + /** + * 创建一个单线程的线程池 + */ + public static ExecutorService newSingleThreadExecutor() { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } + + /** + * 创建一个缓存的,可动态伸缩的线程池。 + * 可以看出来:核心线程数为0,最大线程数为Integer.MAX_VALUE,如果任务数在某一瞬间暴涨, + * 这个线程池很可能会把 服务器撑爆。 + * 另外需要注意的是,它们底层都是使用了 ThreadPoolExecutor,只不过帮我们配好了参数 + */ + public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } +} +``` \ No newline at end of file diff --git a/docs/JDK/Lock锁组件.md b/docs/JDK/Lock锁组件.md new file mode 100644 index 0000000..c4c549a --- /dev/null +++ b/docs/JDK/Lock锁组件.md @@ -0,0 +1,648 @@ +## 类图结构 +J.U.C 的锁组件中 类相对较少,从JDK相应的包中也能看出来,下图标记了其中最主要的几个接口和类,也是本文要分析的重点。 + +![avatar](/images/JDK1.8/JUC的locks包.png) + +下图 将这几个接口和类 以类图的方式展现出来,其中包含了它们所声明的主要方法。 + +![avatar](/images/JDK1.8/JUC锁组件类图.png) + +## Lock 组件 +Lock 组件的结构很简单,只有一个接口和一个实现类,源码如下。 +```java +public interface Lock { + + /** + * 获取锁 + */ + void lock(); + + /** + * 获取锁,除非当前线程中断 + */ + void lockInterruptibly() throws InterruptedException; + + /** + * 只有当调用时 锁是空闲的情况下,才获取锁 + */ + boolean tryLock(); + + /** + * 如果锁在给定的等待时间内空闲且当前线程未被中断,则获取该锁 + */ + boolean tryLock(long time, TimeUnit unit) throws InterruptedException; + + /** + * 释放锁 + */ + void unlock(); +} + +public class ReentrantLock implements Lock, java.io.Serializable { + + /** 提供所有实现机制的同步器,ReentrantLock 的主要方法都依赖于该对象进行实现 */ + private final Sync sync; + + /** + * ReentrantLock锁 的同步控制基础。它的两个子类分别实现了公平锁和非公平锁,如下。 + */ + abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = -5179523762034025860L; + + abstract void lock(); + + /** + * Performs non-fair tryLock. tryAcquire is implemented in + * subclasses, but both need nonfair try for trylock method. + */ + final boolean nonfairTryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } + + protected final boolean tryRelease(int releases) { + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); + } + setState(c); + return free; + } + + final boolean isLocked() { + return getState() != 0; + } + } + + /** + * 非公平锁,基于上面的 Sync类 + */ + static final class NonfairSync extends Sync { + private static final long serialVersionUID = 7316153563782823691L; + + final void lock() { + if (compareAndSetState(0, 1)) + setExclusiveOwnerThread(Thread.currentThread()); + else + acquire(1); + } + + protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); + } + } + + /** + * 公平锁,基于上面的 Sync类 + */ + static final class FairSync extends Sync { + private static final long serialVersionUID = -3000897897090466540L; + + final void lock() { + acquire(1); + } + + protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (!hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } + } + + /** + * 无参初始化时,默认实例化 非公平锁 + */ + public ReentrantLock() { + sync = new NonfairSync(); + } + + /** + * 可通过参数fair 控制实例化的是 公平锁还是非公平锁 + */ + public ReentrantLock(boolean fair) { + sync = fair ? new FairSync() : new NonfairSync(); + } + + public void lock() { + sync.lock(); + } + + public boolean tryLock() { + return sync.nonfairTryAcquire(1); + } + + public boolean tryLock(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireNanos(1, unit.toNanos(timeout)); + } + + public void unlock() { + sync.release(1); + } + + public boolean isLocked() { + return sync.isLocked(); + } + + public final boolean isFair() { + return sync instanceof FairSync; + } +} +``` + +## ReadWriteLock 组件 +ReadWriteLock 组件的结构也很简单,与上面的 Lock组件 不同的是,它提供了 公平的读锁写锁,以及非公平的读锁写锁。 +```java +public interface ReadWriteLock { + /** + * 获取一个 读锁 + */ + Lock readLock(); + + /** + * 获取一个 写锁 + */ + Lock writeLock(); +} + +public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { + + /** 由内部类提供的读锁 */ + private final ReentrantReadWriteLock.ReadLock readerLock; + /** 由内部类提供的写锁 */ + private final ReentrantReadWriteLock.WriteLock writerLock; + /** 提供所有实现机制的同步器 */ + final Sync sync; + + /** + * 默认创建 非公平的读锁写锁 + */ + public ReentrantReadWriteLock() { + this(false); + } + + /** + * 由参数 fair 指定读锁写锁是公平的还是非公平的 + */ + public ReentrantReadWriteLock(boolean fair) { + sync = fair ? new FairSync() : new NonfairSync(); + readerLock = new ReadLock(this); + writerLock = new WriteLock(this); + } + + /** + * 获取写锁 + * 获取读锁 + */ + public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } + public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } + + abstract static class Sync extends AbstractQueuedSynchronizer { + + protected final boolean tryRelease(int releases) { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + int nextc = getState() - releases; + boolean free = exclusiveCount(nextc) == 0; + if (free) + setExclusiveOwnerThread(null); + setState(nextc); + return free; + } + + protected final boolean tryAcquire(int acquires) { + /* + * Walkthrough: + * 1. If read count nonzero or write count nonzero + * and owner is a different thread, fail. + * 2. If count would saturate, fail. (This can only + * happen if count is already nonzero.) + * 3. Otherwise, this thread is eligible for lock if + * it is either a reentrant acquire or + * queue policy allows it. If so, update state + * and set owner. + */ + Thread current = Thread.currentThread(); + int c = getState(); + int w = exclusiveCount(c); + if (c != 0) { + // (Note: if c != 0 and w == 0 then shared count != 0) + if (w == 0 || current != getExclusiveOwnerThread()) + return false; + if (w + exclusiveCount(acquires) > MAX_COUNT) + throw new Error("Maximum lock count exceeded"); + // Reentrant acquire + setState(c + acquires); + return true; + } + if (writerShouldBlock() || + !compareAndSetState(c, c + acquires)) + return false; + setExclusiveOwnerThread(current); + return true; + } + + protected final boolean tryReleaseShared(int unused) { + Thread current = Thread.currentThread(); + if (firstReader == current) { + // assert firstReaderHoldCount > 0; + if (firstReaderHoldCount == 1) + firstReader = null; + else + firstReaderHoldCount--; + } else { + HoldCounter rh = cachedHoldCounter; + if (rh == null || rh.tid != getThreadId(current)) + rh = readHolds.get(); + int count = rh.count; + if (count <= 1) { + readHolds.remove(); + if (count <= 0) + throw unmatchedUnlockException(); + } + --rh.count; + } + for (;;) { + int c = getState(); + int nextc = c - SHARED_UNIT; + if (compareAndSetState(c, nextc)) + // Releasing the read lock has no effect on readers, + // but it may allow waiting writers to proceed if + // both read and write locks are now free. + return nextc == 0; + } + } + + protected final int tryAcquireShared(int unused) { + /* + * Walkthrough: + * 1. If write lock held by another thread, fail. + * 2. Otherwise, this thread is eligible for + * lock wrt state, so ask if it should block + * because of queue policy. If not, try + * to grant by CASing state and updating count. + * Note that step does not check for reentrant + * acquires, which is postponed to full version + * to avoid having to check hold count in + * the more typical non-reentrant case. + * 3. If step 2 fails either because thread + * apparently not eligible or CAS fails or count + * saturated, chain to version with full retry loop. + */ + Thread current = Thread.currentThread(); + int c = getState(); + if (exclusiveCount(c) != 0 && + getExclusiveOwnerThread() != current) + return -1; + int r = sharedCount(c); + if (!readerShouldBlock() && + r < MAX_COUNT && + compareAndSetState(c, c + SHARED_UNIT)) { + if (r == 0) { + firstReader = current; + firstReaderHoldCount = 1; + } else if (firstReader == current) { + firstReaderHoldCount++; + } else { + HoldCounter rh = cachedHoldCounter; + if (rh == null || rh.tid != getThreadId(current)) + cachedHoldCounter = rh = readHolds.get(); + else if (rh.count == 0) + readHolds.set(rh); + rh.count++; + } + return 1; + } + return fullTryAcquireShared(current); + } + + /** + * Performs tryLock for write, enabling barging in both modes. + * This is identical in effect to tryAcquire except for lack + * of calls to writerShouldBlock. + */ + final boolean tryWriteLock() { + Thread current = Thread.currentThread(); + int c = getState(); + if (c != 0) { + int w = exclusiveCount(c); + if (w == 0 || current != getExclusiveOwnerThread()) + return false; + if (w == MAX_COUNT) + throw new Error("Maximum lock count exceeded"); + } + if (!compareAndSetState(c, c + 1)) + return false; + setExclusiveOwnerThread(current); + return true; + } + + /** + * Performs tryLock for read, enabling barging in both modes. + * This is identical in effect to tryAcquireShared except for + * lack of calls to readerShouldBlock. + */ + final boolean tryReadLock() { + Thread current = Thread.currentThread(); + for (;;) { + int c = getState(); + if (exclusiveCount(c) != 0 && + getExclusiveOwnerThread() != current) + return false; + int r = sharedCount(c); + if (r == MAX_COUNT) + throw new Error("Maximum lock count exceeded"); + if (compareAndSetState(c, c + SHARED_UNIT)) { + if (r == 0) { + firstReader = current; + firstReaderHoldCount = 1; + } else if (firstReader == current) { + firstReaderHoldCount++; + } else { + HoldCounter rh = cachedHoldCounter; + if (rh == null || rh.tid != getThreadId(current)) + cachedHoldCounter = rh = readHolds.get(); + else if (rh.count == 0) + readHolds.set(rh); + rh.count++; + } + return true; + } + } + } + + final boolean isWriteLocked() { + return exclusiveCount(getState()) != 0; + } + } + + /** + * 非公平锁 + */ + static final class NonfairSync extends Sync { + + final boolean writerShouldBlock() { + return false; // writers can always barge + } + final boolean readerShouldBlock() { + /* As a heuristic to avoid indefinite writer starvation, + * block if the thread that momentarily appears to be head + * of queue, if one exists, is a waiting writer. This is + * only a probabilistic effect since a new reader will not + * block if there is a waiting writer behind other enabled + * readers that have not yet drained from the queue. + */ + return apparentlyFirstQueuedIsExclusive(); + } + } + + /** + * 公平锁 + */ + static final class FairSync extends Sync { + + final boolean writerShouldBlock() { + return hasQueuedPredecessors(); + } + final boolean readerShouldBlock() { + return hasQueuedPredecessors(); + } + } + + /** + * 读锁 + */ + public static class ReadLock implements Lock, java.io.Serializable { + + private final Sync sync; + + protected ReadLock(ReentrantReadWriteLock lock) { + sync = lock.sync; + } + + public void lock() { + sync.acquireShared(1); + } + + public void lockInterruptibly() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + public boolean tryLock() { + return sync.tryReadLock(); + } + + public boolean tryLock(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + public void unlock() { + sync.releaseShared(1); + } + } + + /** + * 写锁 + */ + public static class WriteLock implements Lock, java.io.Serializable { + + private final Sync sync; + + protected WriteLock(ReentrantReadWriteLock lock) { + sync = lock.sync; + } + + public void lock() { + sync.acquire(1); + } + + public void lockInterruptibly() throws InterruptedException { + sync.acquireInterruptibly(1); + } + + public boolean tryLock( ) { + return sync.tryWriteLock(); + } + + public boolean tryLock(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireNanos(1, unit.toNanos(timeout)); + } + + public void unlock() { + sync.release(1); + } + } + + public final boolean isFair() { + return sync instanceof FairSync; + } + + public boolean isWriteLocked() { + return sync.isWriteLocked(); + } +} +``` + +## AbstractQueuedSynchronizer +最后看一下抽象类 AbstractQueuedSynchronizer,在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用 AQS 提供的模板方法实现同步组件语义,AQS 则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS 的核心包括:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法。源码如下。 +```java +public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer + implements java.io.Serializable { + + /** + * 当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。 + * 就数据结构而言,队列的实现方式无外乎两者一是通过数组的形式,另外一种则是链表的形式。 + * AQS中的同步队列则是通过链式方式进行实现,下面的内部类Node便是其实现的载体 + */ + static final class Node { + + /** Marker to indicate a node is waiting in shared mode */ + static final Node SHARED = new Node(); + /** Marker to indicate a node is waiting in exclusive mode */ + static final Node EXCLUSIVE = null; + + // 节点从同步队列中取消 + static final int CANCELLED = 1; + // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点, + // 使得后继节点的线程能够运行; + static final int SIGNAL = -1; + // 当前节点进入等待队列中 + static final int CONDITION = -2; + // 表示下一次共享式同步状态获取将会无条件传播下去 + static final int PROPAGATE = -3; + + // 节点状态 + volatile int waitStatus; + + // 当前节点/线程的前驱节点 + volatile Node prev; + + // 当前节点/线程的后驱节点 + volatile Node next; + + // 加入同步队列的线程引用 + volatile Thread thread; + + // 等待队列中的下一个节点 + Node nextWaiter; + + final boolean isShared() { + return nextWaiter == SHARED; + } + + final Node predecessor() throws NullPointerException { + Node p = prev; + if (p == null) + throw new NullPointerException(); + else + return p; + } + + Node() { // Used to establish initial head or SHARED marker + } + + Node(Thread thread, Node mode) { // Used by addWaiter + this.nextWaiter = mode; + this.thread = thread; + } + + Node(Thread thread, int waitStatus) { // Used by Condition + this.waitStatus = waitStatus; + this.thread = thread; + } + } + + /** + * AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队, + * 释放锁时对同步队列中的线程进行通知等核心方法。 + */ + private transient volatile Node head; + private transient volatile Node tail; + + /** + * 获取独占式锁 + */ + public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); + } + + /** + * 释放独占式锁 + */ + public final boolean release(int arg) { + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; + } + + /** + * 获取可中断式锁 + */ + public final void acquireInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (!tryAcquire(arg)) + doAcquireInterruptibly(arg); + } + + /** + * 获取共享锁 + */ + public final void acquireShared(int arg) { + if (tryAcquireShared(arg) < 0) + doAcquireShared(arg); + } + + /** + * 释放共享锁 + */ + public final boolean releaseShared(int arg) { + if (tryReleaseShared(arg)) { + doReleaseShared(); + return true; + } + return false; + } +} +``` \ No newline at end of file diff --git a/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md b/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md index a14c52e..133ce0a 100644 --- a/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md +++ b/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md @@ -168,7 +168,7 @@ public class UnpooledDataSource implements DataSource { 数据库连接池的设计思路一般为: 1. 连接池初始化时创建一定数量的连接,并添加到连接池中备用; 2. 当程序需要使用数据库连接时,从连接池中请求,用完后会将其返还给连接池,而不是直接关闭; -3. 连接池会控制总连接上限及空闲连接上线,如果连接池中的连接总数已达上限,且都被占用,后续的连接请求会进入阻塞队列等待,直到有连接可用; +3. 连接池会控制总连接上限及空闲连接上线,如果连接池中的连接总数已达上限,且都被占用,后续的连接请求会短暂阻塞后重新尝试获取连接,如此循环,直到有连接可用; 4. 如果连接池中空闲连接较多,已达到空闲连接上限,则返回的连接会被关闭掉,以降低系统开销。 PooledDataSource 实现了简易的数据库连接池功能,其创建数据库连接的功能依赖了上面的 UnpooledDataSource。 diff --git a/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md b/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md index 9537a32..93d6daa 100644 --- a/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md +++ b/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md @@ -12,129 +12,129 @@ 上面介绍了使用 DataSourceTransactionManager 实现事务创建、提交和回滚的过程,基本上与单独使用 Connection 实现事务处理是一样的,也是通过设置 autoCommit属性,调用 Connection 的 commit() 和 rollback()方法 来完成的。而我们在声明式事务处理中看到的那些事务处理属性,并不在 DataSourceTransactionManager 中完成,这和我们在前面分析中看到的是一致的。 -![avatar](/images/springTransaction/PlatformTransactionManager组件的设计.png) +![avatar](/images/springTransaction/实现DataSourceTransactionManager的时序图.png) ```java public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { - /** 持有 javax.sql.DataSource对象 */ - private DataSource dataSource; - - /** - * 这里是产生 Transaction对象 的地方,为 Transaction 的创建提供服务,对数据库而言, - * 事务工作是由 Connection 来完成的。这里把数据库的 Connection对象 放到了 ConnectionHolder 中, - * 然后封装到一个 DataSourceTransactionObject对象 中,在这个封装过程中增加了许多为事务处理服务的 - * 控制数据 - */ - @Override - protected Object doGetTransaction() { - DataSourceTransactionObject txObject = new DataSourceTransactionObject(); - txObject.setSavepointAllowed(isNestedTransactionAllowed()); - // 获取与当前线程绑定的 数据库Connection,这个 Connection 在第一个事务开始 - // 的地方与线程绑定 - ConnectionHolder conHolder = - (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); - txObject.setConnectionHolder(conHolder, false); - return txObject; - } - - /** - * 判断是否存在活跃的事务,由 ConnectionHolder 的 transactionActive属性 来控制 - */ - @Override - protected boolean isExistingTransaction(Object transaction) { - DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; - return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); - } - - /** - * 这里是处理事务开始的地方,在这里设置隔离级别,但忽略超时 - */ - @Override - protected void doBegin(Object transaction, TransactionDefinition definition) { - DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; - Connection con = null; - - try { - if (txObject.getConnectionHolder() == null || - txObject.getConnectionHolder().isSynchronizedWithTransaction()) { - Connection newCon = this.dataSource.getConnection(); - if (logger.isDebugEnabled()) { - logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); - } - txObject.setConnectionHolder(new ConnectionHolder(newCon), true); - } - - txObject.getConnectionHolder().setSynchronizedWithTransaction(true); - con = txObject.getConnectionHolder().getConnection(); - - Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); - txObject.setPreviousIsolationLevel(previousIsolationLevel); - - // 这里是 数据库Connection 完成事务处理的重要配置,需要把 autoCommit属性 关掉 - if (con.getAutoCommit()) { - txObject.setMustRestoreAutoCommit(true); - if (logger.isDebugEnabled()) { - logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); - } - con.setAutoCommit(false); - } - txObject.getConnectionHolder().setTransactionActive(true); - - int timeout = determineTimeout(definition); - if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { - txObject.getConnectionHolder().setTimeoutInSeconds(timeout); - } - - // 把当前的 数据库Connection 与线程绑定 - if (txObject.isNewConnectionHolder()) { - TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); - } - } - - catch (Throwable ex) { - DataSourceUtils.releaseConnection(con, this.dataSource); - throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); - } - } - - /** - * 事务提交的具体实现 - */ - @Override - protected void doCommit(DefaultTransactionStatus status) { - // 取得 Connection 以后,通过Connection 进行提交 - DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); - Connection con = txObject.getConnectionHolder().getConnection(); - if (status.isDebug()) { - logger.debug("Committing JDBC transaction on Connection [" + con + "]"); - } - try { - con.commit(); - } - catch (SQLException ex) { - throw new TransactionSystemException("Could not commit JDBC transaction", ex); - } - } - - /** - * 事务提交的具体实现,通过 Connection对象 的 rollback()方法 实现 - */ - @Override - protected void doRollback(DefaultTransactionStatus status) { - DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); - Connection con = txObject.getConnectionHolder().getConnection(); - if (status.isDebug()) { - logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); - } - try { - con.rollback(); - } - catch (SQLException ex) { - throw new TransactionSystemException("Could not roll back JDBC transaction", ex); - } - } + /** 持有 javax.sql.DataSource对象 */ + private DataSource dataSource; + + /** + * 这里是产生 Transaction对象 的地方,为 Transaction 的创建提供服务,对数据库而言, + * 事务工作是由 Connection 来完成的。这里把数据库的 Connection对象 放到了 ConnectionHolder 中, + * 然后封装到一个 DataSourceTransactionObject对象 中,在这个封装过程中增加了许多为事务处理服务的 + * 控制数据 + */ + @Override + protected Object doGetTransaction() { + DataSourceTransactionObject txObject = new DataSourceTransactionObject(); + txObject.setSavepointAllowed(isNestedTransactionAllowed()); + // 获取与当前线程绑定的 数据库Connection,这个 Connection 在第一个事务开始 + // 的地方与线程绑定 + ConnectionHolder conHolder = + (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); + txObject.setConnectionHolder(conHolder, false); + return txObject; + } + + /** + * 判断是否存在活跃的事务,由 ConnectionHolder 的 transactionActive属性 来控制 + */ + @Override + protected boolean isExistingTransaction(Object transaction) { + DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; + return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); + } + + /** + * 这里是处理事务开始的地方,在这里设置隔离级别,但忽略超时 + */ + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) { + DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; + Connection con = null; + + try { + if (txObject.getConnectionHolder() == null || + txObject.getConnectionHolder().isSynchronizedWithTransaction()) { + Connection newCon = this.dataSource.getConnection(); + if (logger.isDebugEnabled()) { + logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); + } + txObject.setConnectionHolder(new ConnectionHolder(newCon), true); + } + + txObject.getConnectionHolder().setSynchronizedWithTransaction(true); + con = txObject.getConnectionHolder().getConnection(); + + Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); + txObject.setPreviousIsolationLevel(previousIsolationLevel); + + // 这里是 数据库Connection 完成事务处理的重要配置,需要把 autoCommit属性 关掉 + if (con.getAutoCommit()) { + txObject.setMustRestoreAutoCommit(true); + if (logger.isDebugEnabled()) { + logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); + } + con.setAutoCommit(false); + } + txObject.getConnectionHolder().setTransactionActive(true); + + int timeout = determineTimeout(definition); + if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { + txObject.getConnectionHolder().setTimeoutInSeconds(timeout); + } + + // 把当前的 数据库Connection 与线程绑定 + if (txObject.isNewConnectionHolder()) { + TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); + } + } + + catch (Throwable ex) { + DataSourceUtils.releaseConnection(con, this.dataSource); + throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); + } + } + + /** + * 事务提交的具体实现 + */ + @Override + protected void doCommit(DefaultTransactionStatus status) { + // 取得 Connection 以后,通过Connection 进行提交 + DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); + Connection con = txObject.getConnectionHolder().getConnection(); + if (status.isDebug()) { + logger.debug("Committing JDBC transaction on Connection [" + con + "]"); + } + try { + con.commit(); + } + catch (SQLException ex) { + throw new TransactionSystemException("Could not commit JDBC transaction", ex); + } + } + + /** + * 事务提交的具体实现,通过 Connection对象 的 rollback()方法 实现 + */ + @Override + protected void doRollback(DefaultTransactionStatus status) { + DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); + Connection con = txObject.getConnectionHolder().getConnection(); + if (status.isDebug()) { + logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); + } + try { + con.rollback(); + } + catch (SQLException ex) { + throw new TransactionSystemException("Could not roll back JDBC transaction", ex); + } + } } ``` 上面介绍了使用 DataSourceTransactionManager 实现事务创建、提交和回滚的过程,基本上与单独使用 Connection 实现事务处理是一样的,也是通过设置 autoCommit属性,调用 Connection 的 commit() 和 rollback()方法 来完成的。看到这里,大家一定会觉得非常的熟悉。而我们在声明式事务处理中看到的那些事务处理属性,并不在 DataSourceTransactionManager 中完成,这和我们在前面分析中看到的是一致的。 diff --git a/images/JDK1.8/JUC的locks包.png b/images/JDK1.8/JUC的locks包.png new file mode 100644 index 0000000..6b095c3 Binary files /dev/null and b/images/JDK1.8/JUC的locks包.png differ diff --git a/images/JDK1.8/JUC锁组件类图.png b/images/JDK1.8/JUC锁组件类图.png new file mode 100644 index 0000000..2a38b6f Binary files /dev/null and b/images/JDK1.8/JUC锁组件类图.png differ diff --git a/images/JDK1.8/线程池组件类图.png b/images/JDK1.8/线程池组件类图.png new file mode 100644 index 0000000..dc83ba5 Binary files /dev/null and b/images/JDK1.8/线程池组件类图.png differ diff --git a/images/springmessage/image-20200304085303580.png b/images/springmessage/image-20200304085303580.png index 81d0df9..3ef4b9b 100644 Binary files a/images/springmessage/image-20200304085303580.png and b/images/springmessage/image-20200304085303580.png differ diff --git a/images/springmessage/image-20200304092154712.png b/images/springmessage/image-20200304092154712.png index 1318ee6..526d07e 100644 Binary files a/images/springmessage/image-20200304092154712.png and b/images/springmessage/image-20200304092154712.png differ diff --git a/images/springmessage/image-20200305085013723.png b/images/springmessage/image-20200305085013723.png index d1955f1..acaf0b7 100644 Binary files a/images/springmessage/image-20200305085013723.png and b/images/springmessage/image-20200305085013723.png differ diff --git a/images/springmessage/image-20200305085845017.png b/images/springmessage/image-20200305085845017.png index 4a0c83c..a2795c0 100644 Binary files a/images/springmessage/image-20200305085845017.png and b/images/springmessage/image-20200305085845017.png differ diff --git a/images/springmessage/image-20200305090846313.png b/images/springmessage/image-20200305090846313.png index e11a46d..cbea7c1 100644 Binary files a/images/springmessage/image-20200305090846313.png and b/images/springmessage/image-20200305090846313.png differ