Java并发-AQS(1)

原创
2021/02/05 00:30
阅读数 26

AQS即AbstractQueuedSynchronizer(抽象队列同步器),用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示同步状态,通过内置的FIFO队列构成等待队列。

1 AQS简介

同步器本身是个抽象类,一般是通过继承的方式使用,子类通过继承同步器并实现它的抽象方法来管理同步状态。同步器提供了3个方法来修改同步状态(即state变量):getState()setState(int newState)compareAndSetState(int expect, int update)

子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用。同步器既支持独占方式获取同步状态,也支持共享方式获取同步状态。

同步器是实现锁(同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。面向使用者,定义了使用者与锁的交互接口,隐藏了实现细节;同步器面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

我们常用的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等皆是基于AQS实现。

2 AQS接口

同步器的设计基于模板方法模式。使用时需要继承同步器并重写指定方法,然后将同步器组合在自定义同步组件中,并调用同步器提供的模板方法,而这些模板方法将会调用重写的方法。

重写同步器方法时,需要使用同步器提供的如下3个方法来访问或者修改同步状态。

  • getState():获取当前同步器状态。
  • setState(int newState):设置当前同步状态。
  • compareAndSetState(int expect, int update):使用CAS设置当前状态(保证状态设置原子性)。

同步器可实现的方法如下:

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之则失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该表示是否被当前线程独占

实现自定义同步组件时,将会调用同步器提供的模板方法。如下:

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回;否则会进入同步队列等待,该方法会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(int arg)相同,但该方法可响应中断,当前线程未获取到同步状态而进入同步队列中,若当前线程被中断,则抛出InterruptedException异常
boolean tryAcquireNanos(int arg, long nanosTimeout) 在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,返回false;获取到则返回true
void acquireShared(int arg) 共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 与acquireShared(int arg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 在acquireSharedInterruptibly(int arg)基础上增加了超时限制
boolean release(int arg) 独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中的第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式释放同步状态
Collection<Thread> getQueuedThreads() 获取等待在同步队列上的线程集合

同步器提供的方法基本上分为3类:①独占式获取与释放同步状态;②共享式获取与释放同步状态;③查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

独占锁示例

class Mutex implements Lock {
  // 静态内部类,自定义同步器
  private static class Sync extends AbstractQueuedSynchronizer {
		
    // 当状态为0时 获取锁
    @Override
    protected boolean tryAcquire(int arg) {
      if (compareAndSetState(0, arg)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }

    // 释放锁 将状态设置为0
    @Override
    protected boolean tryRelease(int arg) {
      if (getState() == 0) {
        throw new IllegalMonitorStateException();
      }
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }

    // 是否处于占用状态
    @Override
    protected boolean isHeldExclusively() {
      return getState() == 1;
    }
    
    // 返回一个Condition 每个condition都包含一个condition队列
    Condition newCondition() {
      return new ConditionObject();
    }
  }
  
  // 仅需要将所有的操作代理到Sync上即可
  private final Sync sync = new Sync();
  
  @Override
  public void lock() {
		sync.acquire(1);
  }
  
  @Override
  public boolean tryLock() {
    return sync.tryAcquire(1);
  }
  
  @Override
  public void unlock() {
    sync.release(1);
  }
  
  @Override
  public Condition newCondition() {
    return sync.newCondition();
  }
  
  @Override
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  
  @Override
  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(time));
  }
}

3 AQS实现

3.1 同步队列

同步器依赖内部的同步队列(一个FIFO的双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存:获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称预计描述。如下:

属性类型及名称 描述
int waitStatus 等待状态<br/>①CANCELED,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后不会再变化<br/>②SINGLE,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行<br/>③CONDITION,值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中<br/>④PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件的被传递下去<br/>⑤INITAL,值为0,初始状态<br/>
Node prev 前驱节点,当节点加入同步队列时被设置(尾部添加)
Node next 后继节点
Node nextWaiter 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是节点类型(独占和共享)和等待队列中的后继节点共用同一个字段
Thread thread 获取同步状态的线程

节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部。

通过Node可以实现两个队列。一是通过prev和next实现CLH队列(线程同步队列、双向队列),二是nextWaiter实现Condition条件上的等待线程队列(单向队列),Condition主要用在ReentrantLock类中。

3.2 独占式同步状态获取及释放

通过同步器的acquire(int arg)方法可获取同步状态,该方法不会响应中断。也就是线程获取同步状态失败后进入同步队列中,后续对线程执行中断操作时,线程不会从同步队列中移出。

同步器acquire代码

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      // acquireQueued 返回线程是否被中断
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 中断当前线程
    selfInterrupt();
}

static void selfInterrupt() {
  Thread.currentThread().interrupt();
}

主要逻辑:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node.EXCLUSIVE)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node, int arg)方法,使该节点以“死循环”的方式获取同步状态。若果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

节点构造及加入同步队列代码如下

同步器的addWaiter及enq代码

private Node addWaiter(Node mode) {
  // 将当前线程包装成Node
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  // 尝试快速在尾部添加
  Node pred = tail;
  if (pred != null) {
    // 设置前驱节点
    node.prev = pred; 
    // 修改尾指针
    if (compareAndSetTail(pred, node)) {
      // 将前驱节点的next设置为自己
      pred.next = node;
      return node;
    }
  }
  //
  enq(node);
  return node;
}

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    // 队列为空,延迟初始化
    if (t == null) { // Must initialize
      // 初始化时 新增一个dummy节点
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

addWaiter中使用compareAndSetTail来安全的添加节点。在enq中,使用“死循环”方式来保证节点能被正确的添加,在“死循环”中只有通过CAS将节点设置成尾节点后,当前线程才能从该节点中返回。否则,当前线程会不断尝试设置。enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。

节点进入队列后,就进入了一个自旋的过程,每个节点都在自省的观察,条件满足,获取到了同步状态就可以从自旋过程中退出,否则依旧留在自旋过程中(并会阻塞节点的线程)。

同步器acquireQueued代码

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      // 当前节点的前驱节点为head时,再次尝试获取锁
      if (p == head && tryAcquire(arg)) {
        // head向后挪一位,并将head设置为dummy节点
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      // 获取锁失败时,判断是否需要将当前线程挂起 && 挂起并检查线程中断状态
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        // 线程被中断
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

private void setHead(Node node) {
  head = node;
  node.thread = null;
  node.prev = null;
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    /*
     * This node has already set status asking a release
     * to signal it, so it can safely park.
     */
    // 这个节点已经设置状态,释放时发出信号,可以被安全的挂起
    return true;
  if (ws > 0) {
    /*
     * Predecessor was cancelled. Skip over predecessors and
     * indicate retry.
     */
    // 前驱节点已被取消(超时/中断)。跳过这些节点,直接排在等待锁的节点后面
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    /*
     * waitStatus must be 0 or PROPAGATE.  Indicate that we
     * need a signal, but don't park yet.  Caller will need to
     * retry to make sure it cannot acquire before parking.
     */
    // 前驱既不是CANCELED也不是SIGNAL。使用CAS将前驱节点设置为SIGNAL状态。
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

private final boolean parkAndCheckInterrupt() {
  // 将线程挂起
  LockSupport.park(this);
  // 被唤醒时,检查线程的中断状态
  return Thread.interrupted();
}

acquireQueued中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态。原因如下:

  1. 头节点是成功获取同步状态的节点,而头节点的线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
  2. 维护同步队列的FIFO原则。

当前线程获取了同步状态并执行了相应的逻辑后,就需要释放同步状态,使得后继节点能够继续获取同步状态。通过调用同步器的release方法可以释放同步状态,该方法释放同步状态后,会唤醒后继节点(使后继节点重新尝试获取同步状态)。

同步器release代码

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    // 是否有后继节点需要唤醒。
    // head为懒初始化,没有排队的线程则为null
    // 节点状态默认为0,当有需要被唤醒的节点时,会将其前驱节点状态设置为SIGNAL(-1)
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

private void unparkSuccessor(Node node) {
  /*
   * If status is negative (i.e., possibly needing signal) try
   * to clear in anticipation of signalling.  It is OK if this
   * fails or if status is changed by waiting thread.
   */
  int ws = node.waitStatus;
  // head节点状态<0,则置为0
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

  /*
   * Thread to unpark is held in successor, which is normally
   * just the next node.  But if cancelled or apparently null,
   * traverse backwards from tail to find the actual
   * non-cancelled successor.
   */
  Node s = node.next;
  // 后继节点为空,或者节点状态>0(超时/中断取消)时,按FIFO找到最近需要唤醒的节点
  if (s == null || s.waitStatus > 0) {
    s = null;
    // 从队列尾部向前遍历至head,找到距离head最近且节点状态<=0的节点
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  // 存在需要被唤醒的节点
  if (s != null)
    LockSupport.unpark(s.thread);
}

该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor使用LockSupport来唤醒处于等待状态的线程。

在查找距离head最近的状态<=0的节点时,从后向前遍历的原因主要是:在多线程条件下,一个节点的next为null,并不能保证它就是尾节点。即addWaiter方法中,优先设置该节点的前驱节点为原尾节点,再修改尾指针指向,再将原尾节点的next设置为新插入节点(当前尾节点)。

在获取同步状态时,同步器维护一个同步队列,获取失败的线程都会被加入到队列中并在队列中自旋;移出队列(停止自旋)的条件是前驱节点是头结点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头结点的后继节点。

3.3 共享式同步状态获取及释放

共享式获取与独占式获取最主要的区别在于同一时刻是否有多个线程同时获取到同步状态。

通过调用acquireShared方法可以共享式的获取同步状态。

同步器acquireShared和doAcquireShared代码

public final void acquireShared(int arg) {
  if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        // 获取共享锁状态
        int r = tryAcquireShared(arg);
        // 返回值小于0,获取共享锁失败
        // 返回值等于0,获取共享锁成功,后续获取会失败
        // 返回值大于0,获取共享锁成功,后续获取很可能成功
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // Record old head for check below
  setHead(node);
  /*
   * Try to signal next queued node if:
   *   Propagation was indicated by caller,
   *     or was recorded (as h.waitStatus either before
   *     or after setHead) by a previous operation
   *     (note: this uses sign-check of waitStatus because
   *      PROPAGATE status may transition to SIGNAL.)
   * and
   *   The next node is waiting in shared mode,
   *     or we don't know, because it appears null
   *
   * The conservatism in both of these checks may cause
   * unnecessary wake-ups, but only when there are multiple
   * racing acquires/releases, so most need signals now or soon
   * anyway.
   */
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
      doReleaseShared();
  }
}

acquireShared中,同步器调用tryAcquireShared方法尝试获取同步状态,tryAcquireShared返回值是int类型,当返回值大于等于0时,表示能够获取到同步状态。共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件是tryAcquireShared方法返回值大于等于0。在doAcquireShared方法自旋过程中,如果当前节点的前驱是头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并且从自旋过程中退出。

与独占方式相同,共享式获取也需要释放同步状态,通过调用releaseShared方法可以释放该状态。

同步器releaseShared代码

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        AbstractQueuedSynchronizer.Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
          	// 头节点ws==SIGNAL,说明后继节点需要被唤醒
            if (ws == AbstractQueuedSynchronizer.Node.SIGNAL) {
              	// 此处CAS保证下面的unparkSuccessor(h)只会被执行一次
                if (!compareAndSetWaitStatus(h, AbstractQueuedSynchronizer.Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
          	// ws为0的情况:①上面的if CAS操作;②节点新增时默认值(尾节点);
          	// CAS操作失败,说明ws被修改,即有新节点加入。再循环一次,即可直接唤醒。
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

该方法在释放同步状态后,将会唤醒后继处于等待状态的节点。对于能后支持多个线程同时访问的并发组件,它和独占式的主要区别在于tryReleaseShared方法必须确保同步状态(或资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

doReleaseShared方法被调用有两处,一个是acquireShared,线程成功获取到共享锁时,在一定条件下调用;一个是releaseShared,当线程释放共享锁时。它的目的是唤醒head节点的下一个节点。当头节点被改变时,会继续唤醒头结点的下一个节点。

3.4 独占式超时获取同步状态

通过调用同步器的doAcquireNanos方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则返回false。该方法提供了传统的Java同步(比如synchronized关键字)所不具备的特性。

同步器doAcquireNanos代码

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
  if (nanosTimeout <= 0L)
    return false;
  // 超时时间
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
      }
      // 超时时间减去当前时间,小于等于0则表示到期/超时
      nanosTimeout = deadline - System.nanoTime();
      if (nanosTimeout <= 0L)
        return false;
      if (shouldParkAfterFailedAcquire(p, node) &&
          nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
      if (Thread.interrupted())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

该方法在自旋的过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但在同步获取失败的处理上有所不同。若果当前线程获取同步状态失败时,则判断是否超时,超时则直接返回false;若没有超时,则使当前线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(this, nanosTimeout)方法返回)。

如果nanosTimeout小于等于spinForTimeoutThreshold(1000L纳秒)时,将不会使该线程进行超时等待,而是进入快速自旋过程。原因在于,非常短的超时等待无法做到十分精确。在超时时间非常短的场景下,同步器会进入无条件的快速自旋。

4 总结

AQS一个没有抽象方法的抽象类,因为抽象类的抽象方法必须被子类全部重写才能够使用。而某些锁/同步组件只需要实现独占/共享中的一种,不使用抽象方法避免了不必要的代码重写。后续还有Condition,以及锁与同步器部分。

5 参考资料

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部