【26】同步器

原创
2018/01/22 22:31
阅读数 154

虽然很多同步器(如:锁/信号量/阻塞队列等等)在功能上有所差别,但其实在内部逻辑上它们其实并没有什么太大的差别。 换句话说,它们内部的基础逻辑都是相似的。 了解这部分基础逻辑,对于使用/设计同步器会很有帮助。 所以,本文旨在剖析同步器的内部逻辑。

注意:

本文的部分内容源自,原作者Jakob Jenkov以及Toke Johansen,Lars Bjørn等人在哥本哈根大学硕士期间的研究项目内容。 在这个项目期间也咨询过Doug Lea 。 而有趣的是,Doug Lea在开发Java 5 并发工具包时,也得出了类似的结论。 Doug Lea的相关工作,可以参见: 《Java Concurrency in Practice》这本书(中文:《Java并发编程实战》)。 这本书中也包含了有关同步器的分析。

同步器主要是用于保护某块代码(临界区)在并发访问时的安全问题。 要达到这个目的,需要同步器来处理以下几个问题:

状态(State)

同步器中的状态,是决定线程是否可以访问的条件。 在锁(Lock)中,状态是一个boolean,这个状态决定了Lock是否被锁定了。 而在有界信号量中,内部状态是一个int型的计数器以及一个上界值,这些状态表示着当前任务数量以及最大任务数。 到了阻塞队列中,状态则是一个List以及一个队列最大容量。

下面这两个代码片段,就是LockBoundedSemaphore的部分代码。 请注意,被标记出来的状态:

public class Lock{

  //state is kept here
  //状态
  private boolean isLocked = false; 

  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      wait();
    }
    isLocked = true;
  }

  ...
}
public class BoundedSemaphore {

    //state is kept here
    //状态
    private int signals = 0;
    private int bound   = 0;
      
  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    this.signal++;
    this.notify();
  }
  ...
}

访问条件

访问条件是用于决定线程是否允许修改数据状态的控制条件。 而线程一般需要调用一个“检测-更新”状态的过程。 而访问条件主要还是基于同步器中的状态(State)。 通常,访问条件会放到一个循环中来检测,这样可以防止伪唤醒。 访问条件一般是一个Boolean表达式。

锁(Lock)中,访问条件是简单的检测isLocked变量。 在有界信号量中,访问条件是由信号量的获取和释放两个操作来完成的: 在获取信号量时,会检查signals是否达到信号量的上限; 在释放信号量时,会检查signals是否为0。

下面这两个代码片段,就是LockBoundedSemaphore的部分代码。 请注意,条件的检测都是放在while循环之中:

public class Lock{

  private boolean isLocked = false;

  public synchronized void lock()
  throws InterruptedException{
    //access condition
    while(isLocked){
      wait();
    }
    isLocked = true;
  }

  ...
}
public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    //access condition
    while(this.signals == bound) wait();
    this.signals++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    //access condition
    while(this.signals == 0) wait();
    this.signals--;
    this.notify();
  }
}

状态变更

一旦某个线程获得了临界区的访问权之后,就需要修改同步器的状态,以阻塞其他线程进入。 也就是说,状态需要反应出一个情况:临界区正在被其他线程执行。 这样,其他线程就会在访问条件上被阻塞。

锁(Lock)中,由isLocked = true进行状态更新。 在信号量中,则是以signals--signals++完成状态更新。

来看看下面两个代码:

public class Lock{

  private boolean isLocked = false;

  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      wait();
    }
    //state change
    isLocked = true;
  }

  public synchronized void unlock(){
    //state change
    isLocked = false;
    notify();
  }
}
public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    //state change
    this.signals++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    //state change
    this.signals--;
    this.notify();
  }
}

通知策略

当线程更新同步器状态时,通常需要通知其他等待的线程。 如果状态更新可以让其他线程能够使得访问条件变为true,那通知就更不能少。

常见的通知策略有以下三种:

  1. 通知所有等待线程
  2. 从N个等待线程中,随机通知其中1个线程
  3. 从N个等待线程中,指定通知1个线程

随机通知一个等待线程,是最简单的一种。 只需要通过调用等待对象(wait())的notify()方法即可。 notify()方法,就是一种随机通知的实现逻辑。

而有的时候,需要通知某个指定的等待线程。 例如,需要保证等待线程被有序的通知到。这个顺序一般是线程进入同步器的顺序,以及按优先级顺序。 为了达到这个目的,就需要让等待线程调用各自等待对象上的wait()方法,而不能是所有线程都在同一个对象上等待。 通知时,需要找个指定的那个线程,并调用这个线程对应的等待对象上的notify()方法。 这个,可以在《并发中的饥饿问题以及公平性》中,找到例子。

下面这个代码片段,就是一个随机通知的例子:

public class Lock{

  private boolean isLocked = false;

  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      //wait strategy - related to notification strategy
      wait();
    }
    isLocked = true;
  }

  public synchronized void unlock(){
    isLocked = false;
    notify(); //notification strategy
  }
}

检测并设值

同步器做的最多的两种操作,其一就是检测并设值:test-and-set。 检测并设值,就是线程去检测同步器的状态,来判断是否达到访问条件。 如果满足了访问条件,则线程会将状态设置为一个新值,以拒绝其他线程的访问。

状态更新,通常会导致其他线程判定访问条件为false(当然这也不是绝对的)。 例如:在《Java中的读写锁》中,线程获得读权限后,会更新读写锁的状态,但是其他线程请求还是可以继续获得读权限。

检测并设值必须是一个原子化操作过程。 这就意味着,在检测 和 设值两个动作之间,不会有其他线程干扰而出现并发问题。

检测并设值一般的处理逻辑如下:

  1. 必要时,在检测动作之前设值
  2. 检测状态,判定是否满足访问条件
  3. 如果不满足访问条件,则等待
  4. 如果满足访问条件,则更新状态,如有必要,还需要通知其他等待线程

读写锁ReadWriteLocklockWrite()方法中,就有一个测试并设值的例子。 当线程调用lockWrite()方法时,会先更新状态(writeRequests++),然后再检测访问条件(canGrantWriteAccess()),是否满足。 如果满足访问条件,则会在退出之前,再次更新状态。 另外,要注意,这个方法并没有通知等待线程。

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    ...


    //注意,这个方法中的 检测并设值 动作
    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(! canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }
    

    ...
}

而在,有界信号量BoundeSemaphore中,take()release()两个方法中都有检测并设值动作:

public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  
  public synchronized void take() throws InterruptedException{
      while(this.signals == bound) wait();
      this.signals++;
      this.notify();
  }

  public synchronized void release() throws InterruptedException{
      while(this.signals == 0) wait();
      this.signals--;
      this.notify();
  }
  
}

设值

同步器中做的最多的操作中,除了上面说的测试并设值以外,就是单纯的设值操作。

单纯的设值操作,就不会检测同步器的内部状态,而直接对状态进行设值。 一个典型的例子就是锁Lock中的unlock()方法。 当线程已经持有了锁对象时,总是可以对其进行解锁操作的。 所以,此时无需再次检测状态。

设值操作一般有以下两个逻辑:

  1. 对内部状态设值
  2. 通知等待线程

来看看unlock()方法的实现:

public class Lock{

  private boolean isLocked = false;
  
  public synchronized void unlock(){
      isLocked = false;
      notify();
  }
  
}
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部