- 原文地址:Anatomy of a Synchronizer
- 作者: Jakob Jenkov
虽然很多同步器(如:锁/信号量/阻塞队列等等)在功能上有所差别,但其实在内部逻辑上它们其实并没有什么太大的差别。 换句话说,它们内部的基础逻辑都是相似的。 了解这部分基础逻辑,对于使用/设计同步器会很有帮助。 所以,本文旨在剖析同步器的内部逻辑。
注意:
本文的部分内容源自,原作者Jakob Jenkov以及Toke Johansen,Lars Bjørn等人在哥本哈根大学硕士期间的研究项目内容。 在这个项目期间也咨询过Doug Lea 。 而有趣的是,Doug Lea在开发Java 5 并发工具包时,也得出了类似的结论。 Doug Lea的相关工作,可以参见: 《Java Concurrency in Practice》这本书(中文:《Java并发编程实战》)。 这本书中也包含了有关同步器的分析。
同步器主要是用于保护某块代码(临界区)在并发访问时的安全问题。 要达到这个目的,需要同步器来处理以下几个问题:
状态(State)
同步器中的状态,是决定线程是否可以访问的条件。 在锁(Lock)中,状态是一个
boolean
,这个状态决定了Lock
是否被锁定了。 而在有界信号量中,内部状态是一个int型的计数器以及一个上界值,这些状态表示着当前任务数量以及最大任务数。 到了阻塞队列中,状态则是一个List以及一个队列最大容量。
下面这两个代码片段,就是
Lock
和BoundedSemaphore
的部分代码。 请注意,被标记出来的状态:
public class Lock{
//state is kept here
//状态
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
}
...
}
public class BoundedSemaphore {
//state is kept here
//状态
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signal++;
this.notify();
}
...
}
访问条件
访问条件是用于决定线程是否允许修改数据状态的控制条件。 而线程一般需要调用一个“检测-更新”状态的过程。 而访问条件主要还是基于同步器中的状态(State)。 通常,访问条件会放到一个循环中来检测,这样可以防止伪唤醒。 访问条件一般是一个Boolean表达式。
在锁(Lock)中,访问条件是简单的检测
isLocked
变量。 在有界信号量中,访问条件是由信号量的获取和释放两个操作来完成的: 在获取信号量时,会检查signals
是否达到信号量的上限; 在释放信号量时,会检查signals
是否为0。
下面这两个代码片段,就是
Lock
和BoundedSemaphore
的部分代码。 请注意,条件的检测都是放在while循环之中:
public class Lock{
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
//access condition
while(isLocked){
wait();
}
isLocked = true;
}
...
}
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
//access condition
while(this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
//access condition
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}
状态变更
一旦某个线程获得了临界区的访问权之后,就需要修改同步器的状态,以阻塞其他线程进入。 也就是说,状态需要反应出一个情况:临界区正在被其他线程执行。 这样,其他线程就会在访问条件上被阻塞。
在锁(Lock)中,由
isLocked = true
进行状态更新。 在信号量中,则是以signals--
和signals++
完成状态更新。
来看看下面两个代码:
public class Lock{
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
while(isLocked){
wait();
}
//state change
isLocked = true;
}
public synchronized void unlock(){
//state change
isLocked = false;
notify();
}
}
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
//state change
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
//state change
this.signals--;
this.notify();
}
}
通知策略
当线程更新同步器状态时,通常需要通知其他等待的线程。 如果状态更新可以让其他线程能够使得访问条件变为true,那通知就更不能少。
常见的通知策略有以下三种:
- 通知所有等待线程
- 从N个等待线程中,随机通知其中1个线程
- 从N个等待线程中,指定通知1个线程
随机通知一个等待线程,是最简单的一种。 只需要通过调用等待对象(
wait()
)的notify()
方法即可。notify()
方法,就是一种随机通知的实现逻辑。
而有的时候,需要通知某个指定的等待线程。 例如,需要保证等待线程被有序的通知到。这个顺序一般是线程进入同步器的顺序,以及按优先级顺序。 为了达到这个目的,就需要让等待线程调用各自等待对象上的
wait()
方法,而不能是所有线程都在同一个对象上等待。 通知时,需要找个指定的那个线程,并调用这个线程对应的等待对象上的notify()
方法。 这个,可以在《并发中的饥饿问题以及公平性》中,找到例子。
下面这个代码片段,就是一个随机通知的例子:
public class Lock{
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
while(isLocked){
//wait strategy - related to notification strategy
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify(); //notification strategy
}
}
检测并设值
同步器做的最多的两种操作,其一就是检测并设值:test-and-set。 检测并设值,就是线程去检测同步器的状态,来判断是否达到访问条件。 如果满足了访问条件,则线程会将状态设置为一个新值,以拒绝其他线程的访问。
状态更新,通常会导致其他线程判定访问条件为false(当然这也不是绝对的)。 例如:在《Java中的读写锁》中,线程获得读权限后,会更新读写锁的状态,但是其他线程请求还是可以继续获得读权限。
检测并设值必须是一个原子化操作过程。 这就意味着,在检测 和 设值两个动作之间,不会有其他线程干扰而出现并发问题。
检测并设值一般的处理逻辑如下:
- 必要时,在检测动作之前设值
- 检测状态,判定是否满足访问条件
- 如果不满足访问条件,则等待
- 如果满足访问条件,则更新状态,如有必要,还需要通知其他等待线程
在读写锁ReadWriteLock的
lockWrite()
方法中,就有一个测试并设值的例子。 当线程调用lockWrite()
方法时,会先更新状态(writeRequests++
),然后再检测访问条件(canGrantWriteAccess()
),是否满足。 如果满足访问条件,则会在退出之前,再次更新状态。 另外,要注意,这个方法并没有通知等待线程。
public class ReadWriteLock{
private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
...
//注意,这个方法中的 检测并设值 动作
public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(! canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
...
}
而在,有界信号量BoundeSemaphore中,
take()
和release()
两个方法中都有检测并设值动作:
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}
设值
同步器中做的最多的操作中,除了上面说的测试并设值以外,就是单纯的设值操作。
单纯的设值操作,就不会检测同步器的内部状态,而直接对状态进行设值。 一个典型的例子就是锁Lock中的
unlock()
方法。 当线程已经持有了锁对象时,总是可以对其进行解锁操作的。 所以,此时无需再次检测状态。
设值操作一般有以下两个逻辑:
- 对内部状态设值
- 通知等待线程
来看看
unlock()
方法的实现:
public class Lock{
private boolean isLocked = false;
public synchronized void unlock(){
isLocked = false;
notify();
}
}