#0 系列目录#
- 并发系列
- 聊聊并发(一)深入分析Volatile的实现原理
- 聊聊并发(二)Java SE1.6中的Synchronized
- 聊聊并发(三)Java线程池的分析和使用
- 聊聊并发(四)深入分析ConcurrentHashMap
- 聊聊并发(五)原子操作的实现原理
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(七)Java中的阻塞队列
- 聊聊并发(八)Fork/Join框架介绍
- 聊聊并发(九)Java中的Copy-On-Write容器
- 聊聊并发(十)生产者消费者模式
- [聊聊并发(十一)—CAS原理]
- 聊聊并发(十二)—AQS分析
- [聊聊并发(十三)—Atomic类]
- [聊聊并发(十四)—MVCC多版本并发控制]
#1 引言# JAVA内置的锁(使用同步方法和同步块
)一直以来备受关注,其优势是可以花最小的空间开销创建锁
(因为每个JAVA对象或者类都可以作为锁使用)和最少的时间开销获得锁
(单线程可以在最短时间内获得锁)。线程同步越来越多地被用在多处理器上,特别是在高并发的情况下,然而,JVM内置锁表现一般,而且不支持任何公平策略
。从JAVA 5开始在java.util.concurrent包中引入了有别于Synchronized的同步框架。
下面谈谈它的设计思路:
设计一个同步器至少应该具以下有两种操作:一个获取方法
,如果当前状态不允许,将一直阻塞这个线程;一个释放方法
,修改状态,让其他线程有运行的机会。并发包中并没有为同步器提供一个统一的API,获取和释放方法在不同的类中的名称不同,比如获取方法有:Lock.lock,Semaphore.acquire, CountDownLatch.await和FutureTask.get
.这些方法一般都重载有多种版本:阻塞与非阻塞版本、支持超时、支持中断
。
java.util.concurrent包中有很多同步类,比如互斥锁、读写锁、信号量
等,这些同步类几乎都可以用不同方式来实现,但是如果这样做,那么这样的项目充其量只能算一个二流工程。JSR166并没有生搬硬套,而是建立了一个同步中心类AbstractQueuedSynchronizer(简称:AQS)的框架
,其中提供了大量的同步操作,而且用户还可以在此类的基础上自定义自己的同步类。其设计目标主要有两点:
- 提高可扩展性,用户可以自定义自己的同步类;
- 最大限度地提高吞吐量,提供自定义公平策略;
#2 设计和实现# 同步器的设计比较直接,前面提到包含获取和释放两个操作
:
**获取操作过程如下:**首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败
。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
**释放操作:**这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程
。
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
要满足以上两个操作,需要以下3点来支持:
- 原子操作同步状态;
- 阻塞或者唤醒一个线程;
- 内部应该维护一个队列;
##2.1 同步状态## AQS用的是一个32位的整型来表示同步状态的
,可以通过以下几个方法来设置和修改这个状态字段:getState(),setState(),compareAndSetState()
.这些方法都需要java.util.concurrent.atomic包的支持,采用CAS操作
.将state设置为32位整型是一个务实的决定,虽然JSR166提供了64位版本的原子操作,但它还是使用对象内部锁来实现的
,如果采用64位的state会导致同步器表现不良好。32位同步器满足大部分应用,如果确实需要64位的状态,可以使用AbstractQueuedLongSynchronizer类
.AQS是一个抽象类,如果它的实现类想要想要拥有对获取和释放的控制权,那它必须实现tryAcquire和tryRelease
两个方法。
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
##2.2 阻塞/唤醒## JSR166以前还没有好的阻塞和解除阻塞线程的API可以使用!只有Thread.suspend 和 Thread.resume,但这两个方法已经被废弃了,原因是有可能导致死锁。如果一个线程拥有监视器然后调用 Thread.suspend 使自已阻塞,另一个线程试图调用Thread.resume去唤醒它,那么这个线程去获取监视器时即出现死锁
。直到后来出现的LockSupport解决了这个问题
,LockSupport.park可以阻塞一个线程,LockSupport.unpack可以解除阻塞,调用一次park,然后调用多次unpack只会唤醒一个线程,阻塞针对线程而不是针对同步器
。特别的,如果一个线程在一个新的同步器上调用pack方法有可能立即返回,因为可能有剩余的unpack存在。虽然调用多次unpack是想彻底清除阻塞状态,但这显得很笨拙,而且不划算,更有效的做法是在多次park的时候才多次unpark
.
##2.3 同步队列## 同步框架最重要的是要有一个同步队列,在这里被严格限制为FIFO队列
,因此这个同步框架不支持基于优先级的同步策略。同步队列采用非阻塞队列毋庸置疑,当时非阻塞队列只有两个可供选择CLH队列锁和MCS队列锁
.原始的CLH Lock仅仅使用自旋锁
,但是相对于MSC Lock它更容易处理cancel和timeout,所以选择了CLH Lock。
CLH队列锁的优点是:进出队快,无锁,畅通无阻
(即使在有竞争的情况下,总有一个线程总是能够很快插入到队尾);检查是否有线程在等待也是很容易的
(只需要检查头尾指针是否相同)。最后设计出来的变种CLH Lock和原始的CLH Lock有较大的差别:
为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针
。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。第二个变动是在每个node里使用一个状态字段去控制阻塞,而不是自旋
。一个排队的线程调用acquire,只有在通过了子类实现的tryAcquire才能返回,确保只有队头线程才允许调用tryAcquire
。- 另外还有一些微小的改动:
head结点使用的是傀儡结点
。
变种的CLH队列如下图所示:
##2.4 条件队列## 同步框架提供了一个ConditionObject,一般和Lock接口配合来支持互斥模型,它提供类似JVM同步器的操作
。条件对象可以和其他同步器有效的整合,它修复了JVM内置同步器的不足:一个锁可以有多个条件
。条件结点内部也有一个状态字段,条件结点是通过nextWaiter指针串起来的一个独立的队列。条件队列中的线程在获取锁之前,必须先被transfer到同步队列中去。transfer先断开条件队列的第一个结点
,然后插入到同步队列中,这个新插入到同步队列中的结点和同步队列中的结点一起排队等待获取锁
。
#3 用法# AbstractQueuedSynchronizer是一个采用模板方法模式实现的同步器基类
,子类只需要实现获取和释放方法。子类一般不直接用于同步控制,而是采用代理模式
。因为获取和释放方法一般是私有的
,实现细节不必暴露出来,所以常用委派的方法来使用同步器类:在一个类的内部申请一个私有的AQS的子类,委派它的所有同步方法
。
class Mutex {
class Sync extends AbstractQueuedSynchronizer {
public boolean tryAcquire(int ignore) {
return compareAndSetState(0, 1);
}
public boolean tryRelease(int ignore) {
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(0);
}
public void unlock() {
sync.release(0);
}
}
java.util.concurrent包中的所有同步工具类都依赖于AQS,其类型程序结构图如下:
AbstractQueuedSynchronizer类还提供了其他一些同步控制方法,包括超时和中断版的获取方法,还集成了独占模式的同步器
,如acquireShared,tryReleaseShared等方法。
##3.1 控制公平## 虽然这个队列被设计为FIFO,但并不意味着这个同步器一定是公平的,前面谈到,在tryAcquire检查之后再排队。因此,新线程完全可以偷偷排在第一个线程前面
。之所以不采用FIFO,有时候是想获得更高的吞吐量,为了减少等待时间,新到的线程与队列头部的线程一起公平竞争
,如果新来的线程比队头的线程快,那么这个新来的线程就获取锁。队头线程失去竞争会再次阻塞,它的继任也将会被阻塞,但这样能避免饥饿。
如果需要绝对公平,那很简单,只需要在tryAcquire方法,不在队头返回false即可
。检查是否在队头可以使用getFirstQueuedThread方法
。有一情况是,队列是空的,同时有多个线程一拥而入,谁先抢到锁就谁运行,这其实与公平并不冲突,是对公平的补充。
##3.2 同步器## JAVA并发框架是如何使用AQS的:
-
ReentrantLock类使用同步状态来代表持有锁的数量
,当一个锁被获得,会记录获取该锁的线程身份,如果一个非当前线程试图释放锁是不合法的。该类也使用了ConditionObject类,和一些监视和检查方法。该类支持公平与非公平两种模式,是通过AQS的两个子类来实现的。 -
ReentrantReadWriteLock类将32位的state分成高位和低位
,16位用于写锁计数,其余16位用于读锁计数。 -
Semaphore类使用同步状态保持当前计数
,acquireShared减少计数,tryRelease的增加计数,如果state是正数就唤醒线程。 -
CountDownLatch类使用同步状态代表计数
。所有线程都获得锁时,状态为0,就唤醒。 -
当然用户可以定义自己的应用程序同步器
。例如:事件,集中管理的锁,基于树的障碍等。
#4 源码分析# ##4.1 线程状态## AQS采用的是CLH队列,CLH队列是由一个一个结点构成的,前面提到结点中有一个状态位,这个状态位与线程状态密切相关,这个状态位(waitStatus)是一个32位的整型常量
,它的取值如下:
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
下面解释一下每个值的含义:
CANCELLED:因为超时或者中断,结点会被设置为取消状态
,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;
SIGNAL:表示这个结点的继任结点被阻塞了
,到时需要通知它;
CONDITION
:表示这个结点在条件队列中,因为等待某个条件而被阻塞
;
PROPAGATE:使用在共享模式头结点有可能处于这种状态
,表示锁的下一次获取可以无条件传播;
0:None of the above,新结点会处于这种状态
;
##4.2 获取## AQS中比较重要的两个操作是获取和释放,以下是各种获取操作:
public final void acquire(int arg);
public final void acquireInterruptibly(int arg);
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg);
protected boolean tryAcquire(int arg);
protected int tryAcquireShared(int arg);
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
获取操作的流程图如下:
-
如果尝试获取锁成功整个获取操作就结束,尝试获取锁是
通过方法tryAcquire来实现的
,AQS中并没有该方法的具体实现,只是简单地抛出一个不支持操作异常,只需要知道如果获取锁成功该方法返回true即可
; -
如果获取锁失败,那么就
创建一个代表当前线程的结点加入到等待队列的尾部
,是通过addWaiter方法实现的,来看该方法的具体实现:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
该方法创建了一个独占式结点
,然后判断队列中是否有元素,如果有(pred!=null)就设置当前结点为队尾结点
,返回;如果没有元素(pred==null),表示队列为空,走的是入队操作
;
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法采用的是变种CLH算法
,先看头结点是否为空,如果为空就创建一个傀儡结点,头尾指针都指向这个傀儡结点,这一步只会在队列初始化时会执行
;
如果头结点非空,就采用CAS操作
将当前结点插入到头结点后面,如果在插入的时候尾结点有变化,就将尾结点向后移动直到移动到最后一个结点为止,然后再把当前结点插入到尾结点后面,尾指针指向当前结点,入队成功。
- 将新加入的结点放入队列之后,这个结点有两种状态,
要么获取锁,要么就挂起
,如果这个结点不是头结点,就看看这个结点是否应该挂起,如果应该挂起,就挂起当前结点,是否应该挂起是通过shouldParkAfterFailedAcquire方法来判断的
。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法首先检查前趋结点的waitStatus位,如果为SIGNAL,表示前趋结点会通知它,那么它可以放心大胆地挂起了
;
如果前趋结点是一个被取消的结点怎么办呢?那么就向前遍历跳过被取消的结点,直到找到一个没有被取消的结点为止,将找到的这个结点作为它的前趋结点,将找到的这个结点的waitStatus位设置为SIGNAL
,返回false表示线程不应该被挂起。
上面谈的不是头结点的情况决定是否应该挂起,是头结点的情况呢?
是头结点的情况,当前线程就调用tryAcquire尝试获取锁,如果获取成功就将头结点设置为当前结点,返回;如果获取失败就循环尝试获取锁,直到获取成功为止
。整个acquire过程就分析完了。
##4.3 释放## 释放操作有以下方法:
public final boolean release(int arg);
protected boolean tryRelease(int arg);
protected boolean tryReleaseShared(int arg);
下面看看release方法的实现过程:
- release过程比acquire要简单,
首先调用tryRelease释放锁
,如果释放失败,直接返回; 释放锁成功后需要唤醒继任结点
,是通过方法unparkSuccessor实现的:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
(1)node参数传进来的是头结点,首先检查头结点的waitStatus位,如果为负,表示头结点还需要通知后继结点
,这里不需要头结点去通知后继,因此将该标志位清0。
(2)然后查看头结点的下一个结点,如果下一个结点不为空且它的waitStatus<=0,表示后继结点没有被取消,是一个可以唤醒的结点
,于是唤醒后继结点返回;如果后继结点为空或者被取消了怎么办?寻找下一个可唤醒的结点,然后唤醒它返回。
(3)这里并没有从头向尾寻找,而是相反的方向寻找,为什么呢?因为在CLH队列中的结点随时有可能被中断,被中断的结点的waitStatus设置为CANCEL,而且它会被踢出CLH队列
,如何个踢出法,就是它的前趋结点的next并不会指向它,而是指向下一个非CANCEL的结点,而它自己的next指针指向它自己。一旦这种情况发生,如何从头向尾方向寻找继任结点会出现问题,因为一个CANCEL结点的next为自己,那么就找不到正确的继任接点
。
(4)有的人又会问了,CANCEL结点的next指针为什么要指向它自己,为什么不指向真正的next结点?为什么不为NULL?第一个问题的答案是这种被CANCEL的结点最终会被GC回收,如果指向next结点,GC无法回收
。对于第二个问题的回答,JDK中有这么一句话: The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue.大至意思是为了使isOnSyncQueue方法更新简单
。isOnSyncQueue方法判断一个结点是否在同步队列,实现如下:
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
如果一个结点next不为空,那么它在同步队列中,如果CANCEL结点的后继为空那么CANCEL结点不在同步队列中
,这与事实相矛盾。 因此将CANCEL结点的后继指向它自己是合理的选择。
##4.4 共享模式与独占模式## AQL的内部队列采用的是CLH队列锁模型,CLH队列是由一个一个结点(Node)构成的
。Node类中有两个常量SHARE和EXCLUSIVE,顾名思义这两个常量用于表示这个结点支持共享模式还是独占模式,共享模式指的是允许多个线程获取同一个锁而且可能获取成功,独占模式指的是一个锁如果被一个线程持有,其他线程必须等待
。多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
final boolean isShared() {
return nextWaiter == SHARED;
}
以上代码是两种模式的定义,可以通过方法isShared来判断一个结点处于何种模式。
##4.5 共享模式下获取锁## 共享模式下获取锁是通过tryAcquireShared方法来实现的
,其流程大至如下:
AQS类方法中方法名不含shared的默认是独占模式
,前面提到子类需要重写tryAcquire方法,这是在独占模式下。如果子类想支持共享模式,同样必须重写tryAcquireShared方法,线程首先通过tryAcquireShared方法在共享模式下获取锁,如果获取成功就直接返回
,否则执行以下步骤:
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 创建一个新结点(共享模式),加入到队尾,这个过程和独占模式一样,不再重复;
- 判断新结点的前趋结点是否为头结点,如果不是头结点,就将前趋结点的状态标志位设置为SIGNAL,当前线程可以安全地挂起,整个过程结束;
- 如果它的前趋是头结点,就让前趋在共享模式下获取锁,如果获取成功,把当前结点设置为头结点;
- 设置为头结点之后,满足释放锁条件就阻塞等待释放锁。
满足释放锁的条件为:允许传播或者需要通知继任结点,或者继任结点是共享模式的结点:
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
##4.6 共享模式下释放锁## 这是通过方法releaseShared来实现的,整个流程如下:
- 调用子类的tryReleaseShared尝试获取锁,如果失败,直接返回;
- 如果成功调用doReleaseShared方法做后续处理,doReleaseShared方法如下:
/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这个方法就一个目的,就是把当前结点设置为SIGNAL或者PROPAGATE
,如果当前结点不是头结点也不是尾结点,先判断当前结点的状态位是否为SIGNAL,如果是就设置为0,因为共享模式下更多使用PROPAGATE来传播,SIGNAL会被经过两步改为PROPAGATE:
compareAndSetWaitStatus(h, Node.SIGNAL, 0)
compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
为什么要经过两步呢?原因在unparkSuccessor方法:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
......
}
如果直接从SIGNAL到PROPAGATE,那么到unparkSuccessor方法里面又被设置为0:SIGNAL--PROPAGATE---0----PROPAGATE 对头结点相当于多做了一次compareAndSet操作,其实性能也殊途同归啦!
##4.7 闭锁(CountDownLatch)## 闭锁是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。闭锁有几个重要的方法:
public void await() throws InterruptedException;
public void countDown();
其中await方法使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断,如果锁存器为0方法立即返回,一开始锁存器不会为0,当调用countDown方法之后锁存器会减少,当锁存器减少到0时,await方法就会返回。现在看看await方法的实现:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
不出所料,闭锁的await方法正是使用的共享模式的AQS,acquireSharedInterruptibly和acquireShared方法类似,只不过会先响应中断。也就是当有多个线程调用await方法时,这些线程都被阻塞到了doAcquireShared方法的以下地方:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
前面看到doAcquireShared里面有一个for循环,退出for循环的唯一方式是要tryAcquireShared方法返回值大于0,下面看看tryAcquireShared的方法在闭锁中的实现:
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
......
}
private final Sync sync;
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
......
}
count代表是的线程数,在创建闭锁的同步器时这个count值被赋给了state,因此state肯定不为0,所以tryAcquireShared方法肯定返回-1,也就是这些线程调用await方法时tryAcquireShared都返回-1,这些线程都会阻塞在doAcquireShared的for循环里。然后这些线程依次调用countDown方法,直到最后一个线程调用完后这些线程才会退出for循环继续执行。下面看看countDown方法的实现过程:
public void countDown() {
sync.releaseShared(1);
}
//sync.releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
仍然不出所料,countDown方法正是调用的releaseShared方法,前面提到releaseShared会先调用tryReleaseShared方法,这是由闭锁实现的:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
该方法会递减state的值,直到变为0返回false.
现在整个闭锁的执行流程很明确了:N个线程调用await阻塞在for循环里面,然后N个线程依次调用countDown,每调用一次state减1,直接state为0,这些线程退出for循环(解除阻塞)!
退出for循环时,由于头结点状态标志位为PROPAGATE,而且这些结点都是共享模式,由头结点一传播,这些结点都获取锁,于是齐头并进执行了......
共享与独占在读写锁里面也有用到,后面再分析。
##4.7 中断## JAVA中并没有好的中断线程的方式,早期引入的Thead.stop()和Thread.resume()容易导致死锁,已经不推荐使用。 JAVA线程提供了协作式中断,何为协作是中断,是相对抢占式中断而言的,简单来讲就是设置一个中断标志位,不停地检查这个标志位的状态,如果检查到线程中断,就中断线程。JVM线程内部维护着一个中断标志,程序员不能直接操作这个中断标志位,只能通过线程的以下几个方法设置中断位:
public void interrupt();
public static boolean interrupted();
private native boolean isInterrupted(boolean ClearInterrupted);
public boolean isInterrupted();
AQS中提供了支持中断的方法:
private void doAcquireInterruptibly(int arg) throws InterruptedException;
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException;
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
这几个方法都抛出了InterruptedException,这些方法都会先抛出中断异常,处理的代码如下:
if (Thread.interrupted())
throw new InterruptedException();
我们还看到有些方法并没有申请抛出InterruptedException,当它被中断时,设置了线程的中断位。
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
##4.8 超时## AQS与JVM内置锁的一个不同点在于AQS中提供了超时机制,即线程在等待一定时间后会立即返回。下面以doAcquireNanos为例来分析:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先取得当前系统时间,在循环等待的过程中,如果剩余时间<=0立即返回;
- 如果剩余时间>0,就用总时间减去一次循环耗费的时间,继续阻塞;
- 如果在这期间线程被中断,就抛出中断异常,如果有其他异常产生,就取消这次获取。
##4.9 取消## 取消获取的逻辑比较复杂,下面来分析一下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 首先取得当前结点的前趋结点,如果前趋结点也被取消直接跳过,继续向前找非取消的结点;
- 将当前结点设置为取消状态;
- 如果当前结点是队尾结点,则将当前结点从队尾移除;否则执行4;
- 找到当前结点的继任结点,前趋的next指针指向继任结点(pred->next=current->next);
- 当前结点的next指针指向自己,前面提到这一方面为了回收,一方面为了使isOnSyncQueue方法简单。
##4.10 其他## AQS还提供了一些线程监控的方法:
//获取哪些线程在等待
protected final Collection<Thread> getWaitingThreads();
//获取等待队列的长度
protected final int getWaitQueueLength();
//是否有线程在等待
protected final boolean hasWaiters()
//是否拥有同步器
final boolean isOwnedBy(AbstractQueuedSynchronizer sync)
//是否在同步队列中
final boolean isOnSyncQueue(Node node)
//支持共享模式的线程
public final Collection<Thread> getSharedQueuedThreads()
//支持独占模式的线程
public final Collection<Thread> getExclusiveQueuedThreads();