AQS 原理剖析

原创
09/04 09:17
阅读数 821

AQSAbstractQueuedSynchronizer类称作队列同步器,是构建其他同步器的一个重要的基础框架,同步器自身是没有实现任何同步接口。它是通过控制一个int类型的state变量来表示同步状态,使用一个内置的FIFO(先进先出)队列来构建工作队列操作。

同步器定义有两种资源共享方式:Exclusive(独占式)和Share(共享式)的获取同步状态。

独占式:一个时间点只能执行一个线程。 共享式:一个时间点可多个线程同时执行。

使用方式

同步器的设计采用模板模式,要实现一个同步组件得先继承AbstractQueuedSynchronizer类,通过调用同步器提供的方法和重写同步器的方法来实现。

调用同步器中的方法就是调用前面提到的通过state变量值的操作来表示同步操作,state是被volatile修饰来保证线程可见性。

方法名 描述
getState() 获取当前线程同步状态值。
setState(int newState) 设置当前同步状态值。
compareAndSetState(int expect, int update) 通过CAS设置state的值。

为了避免被重写,以上方法都被final修饰了。

实现同步组件,需要自己根据自己定制化的需求进行处理,所以需要自己重写同步器提供的方法,要重写的方法主要是独占式获取与释放同步状态、共享式获取与释放同步状态。

tryAcquire(int arg) 独占式获取同步状态,返回值为boolean类型,获取成返回true,获取失败返回false

tryRelease(int arg) 独占式释放同步状态,返回值为boolean类型,释放成返回true,释放失败返回false

tryAcquireShared(int arg) 共享式获取同步状态,返回值为int类型,获取成功返回大于 0 的值。

tryReleaseShared(int arg) 共享式释放同步状态,返回值为boolean类型,释放成返回true,释放失败返回false

isHeldExclusively() 独占模式下是否被当前前程独占,返回值为boolean类型,已被当前线程所独占返回true,反之为false

同步器队列

一个同步器里面拥有一个同步队列多个等待队列

同步队列

AbstractQueuedSynchronizer类中,有一个内部类Node,通过该类构造一个内部的同步队列,这是一个FIFO 双向队列。 当前运行线程回去同步状态时,如果获取失败,则将当前线程信息创建一个Node追加到同步队列尾部,然后阻塞当前线程,直到队列的上一个节点的同步状态释放,再唤醒当前线程尝试重新获取同步状态。这个重新获取同步状态操作的节点,一定要是同步队列中第一节点。

Node 源码如下:

static final class Node {
    // 共享模式下等待的标记
    static final Node SHARED = new Node();
    // 独占模式下等待的标记
    static final Node EXCLUSIVE = null;

    /*
    * 等待状态常量值,以下四个常量都是
    */
    static final int CANCELLED =  1;
    
    static final int SIGNAL    = -1;
    
    static final int CONDITION = -2;
    
    static final int PROPAGATE = -3;

    // 等待状态
    volatile int waitStatus;

    // 当前节点的前驱节点
    volatile Node prev;

    // 当前节点的后继节点
    volatile Node next;

    // 获取同步状态的线程(引用)
    volatile Thread thread;

    // 等待队列中的后继节点
    Node nextWaiter;

    // 是否共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

通过以上代码,可以看到节点中保存了节点模式、等待状态、线程引用、前驱和后继节点,构造节点。

同步队列中被阻塞的线程的等待状态包含有四个常量值:CANCELLED、SIGNAL、CONDITION、PROPAGATE ,它们对应的被阻塞原因如下:

  • CANCELLED 同步队列中当前节点的线程等待超时或被中断,需要从同步队列中取消等待。
  • SIGNAL 当前节点释放同步状态或被取消后,通知后继节点的线程运行。
  • CONDITION 当前节点在 Condition 上等待,当其他线程对 Condition 调用了 signal() 方法后,该节点将添加到同步队列中。
  • PROPAGATE 该状态存在共享模式的首节点中,当前节点唤醒后将传播唤醒其他节点。

同步器中持有同步队列的首节点和尾节点的引用,在AbstractQueuedSynchronizer中分别对应headtail字段。

所以同步队列的基本结构如图:

等待队列

AbstractQueuedSynchronizer类中包含一个内部类ConditionObject,该类实现了Condition的接口。一个Condition对象包含一个等待队列,同时Condition对象可以实现等待/通知功能。

Condition持有等待队列中的首节点(firstWaiter)和尾节点(lastWaiter),如下图代码所示:

如果当前线程调用Condition.await()时,会将当前线程信息构建一个 Node 节点,因为Condition持有等待队列中的首尾节点,所以将当前等待队列中的尾节点的nextWaiter指向当前线程构建的节点,同时更新lastWaiter的引用节点。

上述过程中的节点、队列的操作,是获取到锁的线程来调用Condition.await()的,所以整个执行过程在没有基于 CAS 的情况下,也是线程安全的。

通过以上的描述,可以知道一个同步器中同步队列、等待队列构成的示意图:

当调用Condition.await()时,同步队列中的首节点,也就是当前线程所创建的节点,会加入到等待队列中的尾部,释放当前线程的同步状态并且唤醒同步队列的后继节点,当前线程也就进入等待状态,这个先后顺序不能颠倒。这个过程相当于同步队列的首节点的线程构造新的节点加入到等待队列的尾部。

当调用Condition.signal()方法时,会先将等待队列中首节点转移到同步队列尾部,然后唤醒该同步队列中的线程,该线程从Condition.await()中自旋退出,接着在在同步器的acquireQueued()中自旋获取同步状态。

当调用Condition.wait()方法,同步队列首节点转移到等待队列方法:

public final void await() throws InterruptedException {
    // 如果线程已中断,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加节点到等待队列中
    Node node = addConditionWaiter();
    // 修改 state 来达到释放同步状态,避免死锁
    int savedState = fullyRelease(node);
    
    int interruptMode = 0;
    // 判断当前节点是否在同步队列中
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 继续获取同步状态竞争
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 清除已取消的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0) // 被中断时的处理
        reportInterruptAfterWait(interruptMode);
}

上面addc方法是向等待队列中添加一个新的节点。

private Node addConditionWaiter() {
    // 获取等待队列中尾节点
    Node t = lastWaiter;
    // 如果最后一个节点已取消,则清除取消节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 利用当前线程信息创建等待队列的节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    
    if (t == null) // 如果最后尾节点为空,当前节点则为等待队列的首节点
        firstWaiter = node;
    else // 否则将当前尾节点的下一个节点指向当前线程信息所构造的节点
        t.nextWaiter = node;
    lastWaiter = node;  // 更新 Condition 的尾节点引用
    return node;
}

当调用Condition.signal()方法,等待队列首节点转移到同步队列方法:

public final void signal() {
    // 是否被当前线程所独占
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取等待队列中首节点
    Node first = firstWaiter;
    if (first != null)
        // 转移到同步队列,然后唤醒该节点
        doSignal(first);
}

转移同步队列首节点到同步队列,并唤醒该节点方法doSignal()

private void doSignal(Node first) {
    do {
      if ( (firstWaiter = first.nextWaiter) == null)
          lastWaiter = null;
          // 去除首节点
          first.nextWaiter = null;
    } while (!transferForSignal(first) && // 从等待队列中转移到同步队列
              (first = firstWaiter) != null);
}

转移等待队列到同步队列方法transferForSignal(Node node)

final boolean transferForSignal(Node node) {
    // 验证节点是否被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 转移节点至同步队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

等待队列中的头结点线程安全移动到同步队列方法enq(final Node node)

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 同步队列中如果为空,则初始化同步器
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 否则新节点的前驱节点为当前同步队列的尾节点
            node.prev = t;
            // 设置当前新节点为同步队列的尾节点,并更新先前同步队列的尾节点的后继节点指向当前新节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

独占式同步状态

独占式同步状态获取和释放是线程安全的操作,一个时间点确保只有一个线程获取到同步状态。

独占式同步状态获取

acquire(int arg)方法是获取独占式同步状态的方法,当线程获取同步失败时,会加入到同步队列中。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码中,当执行tryAcquire(int arg)方法获取同步状态失败时,接着通过addWaiter(Node.EXCLUSIVE)构造当前线程信息的节点,随后将新构造的节点通过acquireQueued(final Node node, int arg)方法加入到同步队列中,节点在同步队列中自旋等待获取同步状态。

tryAcquire(int arg)是自定义同步器实现的,实现该方法需要保证线程安全获取同步状态,前面讲到AQS提供的compareAndSetState(int expect, int update)方法通过CAS设置state值来保证线程安全。

上面获取独占式同步状态时,主要分析acquireQueued(final Node node, int arg)方法,节点加入队列并自旋等待。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 是否中断标识
        boolean interrupted = false;
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是首节点,并且当前节点获取到同步状态
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为首节点
                setHead(node);
                // 将原首节点(即当前节点的前驱节点)引用清空,利于 GC 回收
                p.next = null;
                // 成功获取到同步状态标志
                failed = false;
                return interrupted;
            }
            // 判断前驱节点是否超时或取消,以及当前线程是否被中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果被中断,则节点出队
        if (failed)
            cancelAcquire(node);
    }
}

在首节点释放同步状态后,同时唤醒后继节点。后继节点通过自旋的方式(这里利用死循环方式)也会检查自己的前驱节点是否为首节点,如果是前驱节点则会尝试获取同步状态。获取成功则返回,否则判断是否被中断或者继续自旋上述获取同步状态操作。

独占式同步状态释放

release(int arg)方法是释放同步状态,当释放同步状态后会唤醒后继节点。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease(int arg)方法同样也是自定义同步器实现。当首节点不为空且处于等待状态时,那么调用unparkSuccessor(Node node)方法唤醒后继节点。

private void unparkSuccessor(Node node) {
    // CAS 设置等待状态为初始状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 如果当前释放同步状态的节点不存在后继节点或后继节点超时/被中断
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点中开始寻找等待状态的节点作为新首节点,这里已排除当前节点(t != node)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放同步状态的整个过程就是:释放同步状态,唤醒后继节点。这个后继节点必须满足,非空、非当前节点、等待状态小于或等于 0 ,即SIGNALCONDITIONPROPAGATE和初始化状态。

独占式资源共享方式除了上面的同步状态获取,还有独占式超时获取使用的方法是doAcquireNanos(int arg, long nanosTimeout)独占式可中断获取使用的方法是acquireInterruptibly(int arg)

共享式同步状态

共享式同步状态同一时间点可以有多个线程获取到同步状态。

共享式同步状态获取

acquireShared(int arg)方法是共享式同步状态获取的方法。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        // 获取同步状态失败后调用的方法
        doAcquireShared(arg);
}

tryAcquireShared(int arg)方法是自定义同步器实现的,返回大于或等于 0 时,表示获取成功。如果小于 0 时,获取同步状态失败后会调用doAcquireShared(int arg)方法进行再次尝试获取。

private void doAcquireShared(int arg) {、
    // 构造一个当前线程信息的新节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋式获取同步状态
        for (;;) {
            final Node p = node.predecessor();
            // 判断新节点的前驱节点是否为首节点
            if (p == head) {
                // 再次尝试获取同步状态
                int r = tryAcquireShared(arg);
                // 获取成功后退出自旋
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上面代码中,当获取同步状态失败后,则创建一个共享模式类型的节点,然后自旋式获取同步状态,如果前驱节点为首节点时则尝试再次获取同步状态,获取同步状态成功后退出当前自旋。

共享式释放同步状态

releaseShared(int arg)方法来释放共享式同步状态。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 同步状态释放成功后,唤醒后面等待状态的节点
        doReleaseShared();
        return true;
    }
    return false;
}

上面tryReleaseShared(int arg)释放同步状态方法必须保证线程安全,因为它多个线程获取到同步状态时会引发并发操作,可以通过循环操作和 CAS 来确保安前行。

doReleaseShared()方法唤醒后续等待状态的节点。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 验证后继节点的线程处于等待状态
            if (ws == Node.SIGNAL) {
                // 再次检查后继节点的线程是否处于等待状态
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;   
                // 唤醒后继节点,这时每唤醒一次就更新一次首节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

共享同步状态释放后,自旋式依次唤醒队列中节点。

总结

从 AQS 中可以借鉴它利用循环和 CAS 来确保并发的安全性的思路,同时它采用模板设计模式定义一个处理逻辑,将具体的特定处理逻辑交由子类自定义实现。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 以及 Tomncat 的 LimitLatch 都有用其作为同步器。

推荐阅读

《Java 线程基础,从这篇开始》

《你必须会的 JDK 动态代理和 CGLIB 动态代理》

《synchronized 原理知多少》

《你必须会的 JDK 动态代理和 CGLIB 动态代理》

《Netty 中粘包/拆包处理》

《ElasticSearch之映射常用操作》

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部