一、回顾基本的读写锁
我们知道读写锁 #java.util.concurrent.locks.ReentrantReadWriteLock
是一个 “读写互斥,写写互斥,读读共享” 的锁。
读写锁的使用非常简单,那就是:
我们只需要保证读锁和写锁来自同一个 ReentrantReadWriteLock
即可,我们知道基于 AQS
实现的锁都是使用一个 原子state
来进行资源控制,那么读写锁是如何去控制这个 原子state
的呢?
注:对重入锁 ReentrantLock
或者 AQS
源码不熟悉的读者往下阅读会有一定的困难,请阅读 图文深入解析Java显式锁底层源码 —— 加解锁是如何实现的 !
二、读写锁概述
如果阅读过前面文章或者说对普通重入锁 #java.util.concurrent.locks.ReentrantLock
有一定了解的小伙伴应该知道,重入锁的实现,就是
1、使用 CAS
对 原子state
进行操作,再根据操作的结果来进行资源控制,且当获取资源失败后,
2、使用 On Sync Queue
来进行阻塞等待排队,并等待唤醒以便进行再次对 原子state
进行 CAS
操作来尝试获取资源
的这么一个反复循环的过程。
这里有一个好消息,读写锁中写锁的资源获取acquire
与释放release
,和重入锁及其类似。读锁在流程上也是分类上面说的两步,但是逻辑则出入较大,不过有了前面的基础,看这篇文章应该不会太吃力。读锁和写锁共用同一个 原子state
和 On Sync Queue
来进行资源控制,那么接下来我们来看看这是如何实现的吧。
三、读锁中对于 原子state 的操作
由于写锁和重入锁基本上是一样的,所以我们先讲读锁。上面说到,读锁的实现也可拆为两个阶段,我们先说说第一个阶段:读锁中对于 原子state
的操作。
读锁中对 原子state
的操作,也就是 tryAcquireShared
方法,我们结合源码和源码中的文档,得出如下三步:
- step1:如果写锁持锁,直接获取资源
acquire
失败(返回 -1),持有写锁的是本线程除外。 - step2:写锁不持锁,则首先根据
queue policy
(公平锁或非公平锁) 判断一下要不要阻塞。不需要阻塞则有其次,通过修改 原子state
来尝试获取资源,成功则要修改一下重入计数。 - step3:上面的都失败了,则进入到
fullTryAcquireShared
中。
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
另外,源码中的文档注释虽然说了不可重入,但实际上是可以重入的,这里简单的说下,如果对写锁(或者说重入锁Reentrantlock
)还有印象的小伙伴应该知道,写锁的重入实际上是对 原子state
进行++操作。而读写则是使用一个 HoldCounter
对象,它的功能很简单,就是负责重入的计数。
但有一个特例,那就是读锁套写锁会死锁,这实际上是读写锁设计上的一个 “缺陷” ,疑问先放在这里,后面我们会娓娓道来。
3.1、原子state 操作之 step1 解析:判断写锁是否已经持锁
这部分逻辑极其简单,但是有一个特殊的设计需要特别关注。
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
其一,getExclusiveOwnerThread() != current
, exclusiveOwnerThread
如字面意思,就是是否为独占线程拥有者,也就是判断当前线程是否持有写锁。
其二,就是刚才提到的特殊设计,c & EXCLUSIVE_MASK != 0
,c
是我们的 原子state
,如果 c
和 EXCLUSIVE_MASK
按位与后不为零,代表无权获取资源,即已经有线程持有了写锁。
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
...
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
其实很好理解,EXCLUSIVE_MASK
,也就是 01111111111111111
,16 个 1。我们知道 原子state
是一个 32 位 int
,要满足同其与后为 0,就必须满足低位都为 0 才可。
那么问题就来了,为什么高位不需要做判断呢? 答案其实呼之欲出,高位是供读锁使用的。读写锁共用一个 原子state
,但读锁控制高 16 位,写锁控制低 16 位。 对于写锁来说,低 16 位都为 0 代表没有持锁,只要有一个 1 ,则代表某线程已经持有写锁。
3.2、原子state 操作之 step2 解析:公平策略与尝试 CAS 操作
第二部分代码看起来看多,但实际上关键的就是几行,可分为三个判断。
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT))
3.2.1、CAS 尝试获取资源
老生常谈。我们知道,重入锁的重入每次使得 原子state
++。读锁也是如此,读锁被设计为,当有新的线程共享了资源,则 原子state
++,当然,是高位进行 ++。比如 3.1 那张图,高位为 3 (0000000000000011),代表有三个线程共享了该锁。compareAndSetState(c, c + SHARED_UNIT)
这个操作就很好理解了。
SHARED_UNIT
就是高位的 1 ( 1 << 16
),CAS
操作尝试将 原子state
由原值修改为原值高位++。
3.2.2、r < MAX_COUNT 避免原子 state 溢出
前面我们说到,读锁的高位用于记录当前有多少个线程共享了此读锁,但是读锁只有 16 位,所以说,我们最多只能表示 65535 ( 16 个 1 ) 个线程共享此读锁。一旦超过,那就要进入 step 3 了。
/** Returns the number of shared holds represented in count. */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
3.2.3、!readerShouldBlock() 公平锁与非公平锁策略
公平锁和非公平锁相信大家都知道,这里不多说。最大的区别就是,公平锁保证一定 “先到先得”,非公平锁则不保证。
我们这里只简单说说 非公平锁的 readerShouldBlock()
是怎么实现的。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
满足如上的逻辑,则直接进入 step3,跳过 step2。
看到 head
,就应该联想起之前文章说过的 On Sync Queue
,四个判断连在一起其实就是汇成一句话,当前 On Sync Queue
没有正在等待的线程,或者正在等待的第一个 Node
是 SHARED
模式,代表可进入 step2。
3.2.4、step2 中做了什么
代码看着很多,但其实很简单,它的核心就是使用一个 HoldCounter
来保存重入状态,以便知道这个线程加了几层锁,道理和重入锁的 原子state
++ 是一样的。
至于为什么第一个持有读锁的线程使用本地变量 firstReaderHoldCount
++,而其他线程使用 HoldCounter
++,个人猜测避免共享读很少的情况(比如大多数情况就只有一个线程持有读锁),反复创建 HoldCounter
对象。
if (r == 0) { // 为 0 代表没有任何线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
3.3、原子state 操作之 step3 解析:fullTryAcquireShared
fullTryAcquireShared
代码看起来冗长无比,但实际上和前面说的大体是一样的,但它是一个死循环,这里不过多赘述,就是 step 1 + step 2 的一个自旋版。
- 判断是否有写锁持锁
- 判断
onSyncQueue
有无等待者之类 - 高位不能超过 65535
- 进行 CAS 操作
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) { // 和 step1 一模一样,通过低位判断是否持锁
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 判断 onSyncQueue 里有没有线程在等待,如果有,代表要进入 onSyncQueue 中阻塞了
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 高位不能超过 65535
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // CAS 操作 + HoldCounter
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
四、读锁中对于 onSyncQueue 的操作
大体过一遍代码,会发现 doAcquireShared
与 acquireQueued
无比的类似!
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
doAcquireShared
与 acquireQueued
都有如下特征:
- 进行
addWaiter
将自己塞入On Sync Queue
尾部 - 如果于
On Sync Queue
中,且前节点为头节点,则有机会对 原子state
进行操作 - 如果前节点的
waitStatus
不为-1(SIGNAL)
且不大于0(CANCEL)
则将其改为-1(SIGNAL)
- 如果前节点的
waitStatus
是-1(SIGNAL)
,则阻塞等待唤醒。
那前面四点这里就不多赘述,前面的文章讲到 acquireQueued
时,花了大量笔墨和图文对其进行了讲解。我们这里主要重点讲讲 setHeadAndPropagate(node, r);
4.1、读锁中对于 onSyncQueue 的操作之 setHeadAndPropagate
前面我们说到 acquireQueued
(也就是重入锁中对于 onSyncQueue
的操作的实现 ) 线程被唤醒后,会将自己设为 On Sync Queue
头部,取代其成为新的头部,这里贴一张来自之前文章的截图:
我们再看看,doAcquireShared
中有了哪些变化,它除了将自己设置为新的 HEAD
,还做了一个额外的操作:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
前两步其实大同小异,就是使得自己取代原先 on Sync Queue
的 HEAD
节点。关键是后面的步骤,我们可以看到,当 propagate > 0
或者队列未空,或者头节点状态为 -1(SIGNAL)
或者新的头节点状态为 -1(SIGNAL)
时(其实已经涵盖了大多数的情况),关键是后面的两个判断
s == null || s.isShared()
这代表 on Sync Queue
为空,或者 on Sync Queue
第一个等待的节点 Node
是一个共享锁节点。
满足这些条件后来到我们的共享锁(读锁)核心代码:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
我们知道第一个等待的节点 Node
是一个共享锁节点。此时进入 doReleaseShared()
,如果其前节点 waitStatus
为 -1(SIGNAL)
,则直接将其唤醒(unparkSuccessor(h);
其实就是 release
的核心操作,即将其唤醒)。
如果说其前节点 waitStatus
为 0(-)
,则尝试将其改为 -3(PROPAGATE)
,并继续。
如果说后续的 Node
都是 SHARE
节点,那么将会被层层唤醒。
4.3、读锁中对于 onSyncQueue 的操作图解
如果说你已经懵了,那么这个小章节来帮你缓解一下,我们通过实操来讲解一下这个过程。我们使用了六个线程来进行模拟,代码很好理解:
为了更好的模拟整个步骤,我们使用 Thread.sleep(500);
来保证整个过程按照我们预想的来进行。
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock l = new ReentrantReadWriteLock();
ReadLock readLock = l.readLock();
WriteLock writeLock = l.writeLock();
writeLock.lock();
Thread t = new Thread(() -> { readLock.lock();System.out.println("A"); });
t.setName("A");
t.start();
Thread.sleep(500);
Thread t1 = new Thread(() -> { readLock.lock();System.out.println("B"); });
t1.setName("B");
t1.start();
Thread.sleep(500);
Thread t2 = new Thread(() -> { readLock.lock();System.out.println("C"); });
t2.setName("C");
t2.start();
Thread.sleep(500);
Thread tx = new Thread(() -> { writeLock.lock();System.out.println("另一个写锁线程"); });
tx.setName("另一个写锁线程");
tx.start();
Thread.sleep(500);
Thread t3 = new Thread(() -> { readLock.lock();System.out.println("D"); });
t3.setName("D");
t3.start();
Thread.sleep(500);
writeLock.unlock();
}
- 主线程首先调用
writeLock.lock();
,也就是将 原子state
由0
替换到1
,因为没有其他线程和它竞争,这个肯定是成功的。 - 后续的线程,无论读锁还是写锁,修改 原子
state
必定失败,因为 原子state
&EXCLUSIVE_MASK
不为0
,因为低位目前是1
,按位与后得1
,然后进入队列尾部进行阻塞,且标记自己为SHARE
节点。 - 主线程调用
writeLock.unlock();
唤醒队列中第二个节点。
4.3.1、setHeadAndPropagate 进行状态传播前的一些列状态图解
当一个共享节点被唤醒后,就来到了我们的核心方法:setHeadAndPropagate
,我们说过,读锁中对于 On Sync Queue
的操作,大体的流程和重入锁是一样的,这里就不赘述,我们来看看直到 setHeadAndPropagate
被调用之前, On Sync Queue
中发生了什么:
首先可以预想到的是,后续的 5 个线程,都将在 On Sync Queue
中等待唤醒,如上图所示。图中将队列第一个节点标记为写锁线程是为了更好的理解,实际上队列中第一个节点是一个空节点。我们断点看看具体是怎么样的表现形式:
上图很明显可以看出此时 HEAD
的 thread
为空,next
指向 Thread A
,层层指入,直到队尾的 Thread D
。
且我们可以知道,被唤醒的线程就是 Thread A
,也就是 HEAD.next
。
4.3.2、setHeadAndPropagate 图解!
先回想一下前面说的,setHeadAndPropagate
,也就是被唤醒 且 CAS
操作 原子state
(高位+1) 成功之后,第一步和之前文章说的一样,把旧的队列头删去,并将自己设为头节点,如下图所示:
后续如果发现队列未空,或者下一个等待的节点为 SHARE
(也就是 Thread B
) 节点,则调用 doReleaseShared();
进行解锁状态的传播,实际上就是唤醒 Thread B
去操作 原子state
(高位+1),直到发现队列头节点的 next
节点不是 SHARE
节点,如下图所示:
我们队列中第一个等待的节点变成了 另一个写锁线程
,并且有三个读线程 A、B、C
获取了锁,此时 原子state
高位为 3
,低位为 0
,我们断点看下是不是这样:
我们看到最后一次传递唤醒,HEAD
的 next
为 另一个写锁线程
,且如下图所示,s.isShared()
为 false
,因为 另一个写锁线程
并不是一个共享节点,是一个独占节点,所以不再执行 doReleaseShared();
唤醒下一个线程(其实唤醒了也没用,因为此时写线程去 CAS
必定失败)。
此时 原子state
的值为 196608
,换算成二进制,则为 0000000000000011 0000000000000000
,符合我们的预期,就是高位为 3
,低位为 0
。
收起所有的调试断点,我们发现刚才的代码打印为:
五、读锁中如何释放资源?
前面说到读锁获取资源 acquire
的流程,那么读锁如何释放资源 release
呢?
回想一下重入锁,重入锁每次释放资源使得 原子state
--,当 原子state
为零时,则唤醒队列中下一个等待的线程。
读锁实际上也是如此,但要注意读锁是有两个状态来维护锁的层级和持有共享锁的线程数量的,一个是 HoldCounter
负责重入计数,一个是 原子state
高位,负责记录有多少个线程持有了共享锁。也就是说,我们释放资源release
时,需要首先使得 HoldCounter--
,直到其为零,再去操作 原子state
高位。
这些其实猜都能猜的出来,直到 原子state
高位归零,就会去唤醒下一个等待的线程,而源码中也确实如此:
代码很简单,这里就不啰嗦了。
六、写锁与重入锁的区别
前面花了很多笔墨写读锁,因为写锁和之前的重入锁基本一致,所以这里不花太多篇幅去写写锁,写锁只有一个地方和重入锁有区别,那就是重入锁整个 原子state
都是为锁服务的,而对于写锁来说,只有低位服务于自己,且如果高位不为 0
(代表有读锁已经持有了资源),也无法 acquire
成功,需要入队列中进行等待。
七、小彩蛋:为什么读锁套写锁会死锁?
这里再次啰嗦一次,我们知道对于写锁来说,可以对 原子state
低位 进行 ++ 来实现重入。对于读锁来说呢,可以对 HoldCounter
进行 ++ 来实现重入,讲道理互相套不应该死锁,因为他们互不影响。
那么为什么读锁套死锁会死锁呢?
然而这样却不会死锁:
其实原因很简单,如果要实现读锁套写锁,其实十分简单,只需要在入写锁 acquire
时判断本线程是否已经有 HoldCounter
即可。但是这样会带来一个问题:
- 假设有十个线程正在共享读锁,此时其中一个读线程重入了写锁,这将导致写锁的 独享形同虚设,正常应该是此线程进入等待队列,等待所有线程都释放了读锁后,才能实现获得独占锁。
然而这个持有读锁的线程已经在执行逻辑了,它无法进行阻塞,如果进入阻塞状态,便会有一个问题:那就是这个阻塞的读线程将 永远无法释放读锁!
这就是读锁套写锁会死锁的原因,但我个人认为这种情况,JDK应该将异常抛出才对,之前的项目中便是碰到了这个情况,找了很久才发现是这个问题,这也算是个小 tips
吧。请大家要注意这个问题 ~ 项目中尽量避免读写锁互相套。
7.1、写锁套读锁却不会死锁
因为持有写锁的线程已经拥有了独占锁,此时再去获取读锁,这个对于整个读写锁来说无关紧要,我们只要保证其他的线程无法获取到读锁即可。
参考资料:
JAVA12源码
欢迎讨论~ 如果觉得写得不错 请点赞哟~ 最后推销一波自己的 Q群,现在基本没人... 期待大牛与萌新进来一起讨论 JAVA 相关技术 ~ 群 《我萌的征途是架构狮!》 欢迎您 ~