【并发编程】 图文深入解析Java显式锁底层源码 —— condition 实现线程调度

原创
2019/06/06 19:43
阅读数 609

一、回顾 AQS 资源的锁定与释放

上篇文章(文章中有详细的源码解读) 说到,AQStryRelease 失败后,资源的锁定与释放,正向流程大体可以分为以下6个阶段。

1-2:当其他线程占据了锁定的资源,另一个线程进行获取时,会进入 FIFO 队列,如果队列未初始化,则进入初始化。

3:进入了 FIFO 队列之后,开始自旋,并不断尝试将前一个节点的 waitStatus 替换成 -1 == SIGNAL

4:最后一次自旋,前一个节点的 waitStatus 已经是 -1 == SIGNAL,则进入阻塞模式`

5:当持有资源锁定的线程调用了 releasestate 从 1 改为 0之后,本线程被唤醒,继续自旋

6:如果本线程节点的 prev 节点为 HEAD,则有机会进行一次资源获取,如果获取成功(将 state 由 0 改为 1 ),则将自己置为头结点,自旋结束。


注意:

为了防止混淆,本文将 AQS 的队列(上篇文章,也是上图说到的队列)称为 On Sync Queue(蓝底+浅蓝色Node),本文中伴随 Condition 对象出现的队列称为 Condition Queue(蓝底+黑色Node


二、AQS 之 Condition 简单 Demo

上篇文章,我们只说到了 waitStatus 的初始状态 0 以及 SIGNAL = -1,表明锁定资源持有者在释放资源时需要通知 next 节点进行 unPark,即唤醒 next 节点。

而接下来这一小节将着重分析 CONDITION = -2 的情况,-2 的意思是代表当前这个节点在 Condition Queue 中排队,等待通知 (signal)。

Condition 的创建十分简单,在原来的显式锁上调用 newCondition() 即可,使用方法和我们熟知的 waitnotify 类似,condition 为我们提供了 awaitsignal 方法,但是它可以做更加细粒度的控制,我们看看下面这个简单的 Demo。

/**
 * Created by Anur IjuoKaruKas on 2019/6/4
 */
public class Condition {

    private ReentrantLock reentrantLock = new ReentrantLock();

    private java.util.concurrent.locks.Condition meetWaiter = reentrantLock.newCondition();

    private java.util.concurrent.locks.Condition fruitWaiter = reentrantLock.newCondition();

    private void buyMeet() throws InterruptedException {
        try {
            reentrantLock.lock();
            print("前去买肉发现没货");
            meetWaiter.await();
            print("被通知:肉进货了~");
        } finally {
            reentrantLock.unlock();
        }
    }

    private void buyFruit() throws InterruptedException {
        try {
            reentrantLock.lock();
            print("前去水果发现没货");
            fruitWaiter.await();
            print("被通知:水果进货了~");
        } finally {
            reentrantLock.unlock();
        }
    }

    private void meetIn() {
        try {
            reentrantLock.lock();
            print("通知:肉进货了~");
            meetWaiter.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    private void fruitIn() {
        try {
            reentrantLock.lock();
            print("通知:水果进货了~");
            fruitWaiter.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Condition condition = new Condition();

        new Thread(() -> {
            try {
                condition.buyFruit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                condition.buyFruit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                condition.buyMeet();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(1000);
        condition.fruitIn();

        Thread.sleep(1000);
        condition.meetIn();
    }

    public static void print(String print) {
        System.out.println(String.format("时间 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print));
    }
}

====================================================
时间 - Wed Jun 05 10:22:14 CST 2019		Thread[Thread-0,5,main]		前去水果发现没货
时间 - Wed Jun 05 10:22:14 CST 2019		Thread[Thread-2,5,main]		前去买肉发现没货
时间 - Wed Jun 05 10:22:14 CST 2019		Thread[Thread-1,5,main]		前去水果发现没货
时间 - Wed Jun 05 10:22:15 CST 2019		Thread[main,5,main]		通知:水果进货了~
时间 - Wed Jun 05 10:22:15 CST 2019		Thread[Thread-0,5,main]		被通知:水果进货了~
时间 - Wed Jun 05 10:22:16 CST 2019		Thread[main,5,main]		通知:肉进货了~
时间 - Wed Jun 05 10:22:16 CST 2019		Thread[Thread-2,5,main]		被通知:肉进货了~

例子虽然举的比较粗俗...... 但是核心就是线程的调度,我们可以在某些条件下使得显示锁阻塞,且通过某些条件被唤醒。

可以看到,我们可以分别为 fruitWaiter 或者 meetWaiter 进行细粒度的唤醒 signal(其实还有个 signalAll)。至于 condition 的使用我们这里不做过多赘述。

三、AQS 之 Condition 正向流程源码解析

我们先纵览一下 await 方法:(不考虑线程被 interrupt 的情况)

  1. 调用 addConditionWaiter();,这一步实际上和我们前面说的 FIFO 队列很像,操作的是 Condition Queue
  2. 调用 int savedState = fullyRelease(node);,如果对前面说的 release 有印象的话,那么这个就很好理解了,一般我们一次 release,正常实现都是使得 state --,对应 acquire 使得 state ++。而这个 fullyRelease 则是一次性释放掉所有 state,直接让 state 归零,并保存 state 状态。
  3. isOnSyncQueue 则是进行一系列判断、阻塞与自旋,它是控制 condition 阻塞的核心代码(实际上很简单)。

  1. 被其他持锁线程 signal 进行通知弹出 Condition Queue ,且进入 On Sync Queue

  1. 回到我们上篇文章说的 tryAcquire 自旋(本文第一章阶段3-6),实际上到这一步,condition 阻塞已经完毕了,接下来回归我们的正常流程,可以理解为,此时被某个线程通知唤醒,但是一唤醒我们不能并不能立刻获得资源,正常的流程还是要走的。
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();// 1、进入 condition 队列
            int savedState = fullyRelease(node);// 2、释放资源,并记录 state
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {// 3、condition 队列阻塞,直到被其他持锁线程 signal(或者被 interrupt)才会停止自旋。
                LockSupport.park(this); // 阻塞
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;// 被 interrupt
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// /5、重新自旋,开始申请锁定资源
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // 6、如果有必要的话,修改 condition 队列
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

下面我们逐一对其进行源码分析:


1、addConditionWaiter 另一个 FIFO 队列!

代码十分简单:

  1. unlinkCancelledWaiters 迭代清理所有 waitState 不为 Node.CONDITION 的节点,并重新设置尾节点。
  2. 新建一个 Node,并将其塞到尾部。Node 对象上篇文章已经讲过,其实它就是一个搭载了一些状态,以及当前线程的一个实例。
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

2、灵活的 state 应用:fullyRelease

代码更加简单,如果你还记得上篇文章所说的,正常实现都是使得 state --,对应 acquire 使得 state ++的话。这里实际上就是一夜回到解放前,release 所有 state

release的实现就不多说了,上篇文章里已经说得很清楚了。

   final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

3、阻塞、且等待被唤醒:isOnSyncQueue 相关

先说说这个 while 循环,先不看 isOnSyncQueue 的实现,外面逻辑很清晰,只要 isOnSyncQueue 为假,线程就会阻塞(park)。后续则是进行是否被 interrupt 的判断,如果被 interrupt,则跳出循环,否则在 isOnSyncQueue 为真之前,线程会不停的被阻塞、唤醒、阻塞、唤醒。

            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

接着再看看 isOnSyncQueue 是如何实现的。通过前面我们可以知道,结果为真,就可以跳出 while 循环。使得结果为真的条件只有两个:

  1. 如果 node.next != null ,则结果为真。
  2. 本线程的 Node 已经位于 On Sync Queue了:findNodeFromTail 方法是一个简单的查找方法,但它是从 On Sync Queuetail 节点,不断往前寻找,如果找到了本 Node,则结果为真。
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

Condition Queue 的解析中,到目前为止只出现了 FirstWaiterLastWaiter、以及 Node 本身的成员变量 nextWaiter。为什么会出现在上篇文章 On Sync Queue 中涉及的 nextprev 等 "指针" 作为判断条件呢?(参考本文第一章那几张图)

4、通知可以离开 Condition Queue 了,但实际上并不唤醒:signal 实现分析

带着上面的疑问,我们来到了 signal() 方法

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

signal 方法的门面比较简单,isHeldExclusively 需要自己实现,ReentrantLock 内部的 Sync 实现是判断当前线程是否持有锁定资源,也就是判断 getExclusiveOwnerThread() == Thread.currentThread(); 锁的线程持有者和当前是否相等。

isHeldExclusively 的设计十分灵活,如果必要的话,我们可以实现一个不需要持有锁线程便可进行 signalAQS 实现,即:不做任何判断直接返回 true 即可。注意,要使用 condition 必须实现此方法!!


signal 方法的核心是 doSignal(first);,我们重点看看在这里做了什么:

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
  • do 阶段主要是将 Condition Queue 队头节点的 nextWaiter 变成新的队头,并同时将 nextWaiter 引用擦除(情况1),如果没有 nextWaiter,则将队列清空(情况2)。

  • while 阶段则是两个常规判断,(first = firstWaiter) != null 很好理解,类似递归调用,不做赘述。关键看看 !transferForSignal(first) 。总结一下就是当 transferForSignal 为真或者队列已经空了,则跳出 while 循环。


transferForSignal 主要做了如下操作:

  1. 将当前 Node 的状态由 CONDITION == -2 改为 0,失败则返回 flase
  2. enq(node);,这个其实就是上篇文章说道的 addWaiter 的核心操作,就是将当前 Node 塞进 On Sync Queue
  3. 优化操作,如果上个节点刚好 cancel (ws > 0) 了,或者 CAS 失败,则将当前节点直接唤醒( 其实就是给了 condition 一个优先去竞争原子 state 的机会 )。
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

整理一下 signal 的逻辑,可以用下图表示,用一句话简单总结就是将 Condition Queue 的头部取出,拿到 On Sync Queue 的尾部。

5、老生常谈:tryAcquire

这一步不多说,略过,上篇文章已经解释的很清楚了。

四、AQS 之 Condition 总览

AQSOn Sync QueueCondition Queue 的关系可以如下表示:

  1. 每个 Condition 都有自己的 Condition Queue,且多个 Condition Queue 之间的 await()signal() 方法相互不影响。
  2. 当某个持有锁的线程调用了某个 Conditionawait() 方法以后,会释放掉锁,且进入该 Condition 所对应的 Condition Queue 的队尾。
  3. 当有某个线程调用了某个 Conditionsignal() 方法后,该 Condition 所对应的 Condition Queue 队头出列,紧接着进入到 On Sync Queue 队尾。注意,该节点并不会被直接唤醒,只是进了 On Sync Queue 队尾。


文章皆是基于源码一步步分析,没有参考过多资料,如有错误,请指出!!


另外欢迎来 Q 群讨论技术相关(目前基本没人)[左二维码]~

如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码]~


参考资料:

JDK12 源码

另外小伙伴可以思考一下:

  1. 在节点从 Condition Queue 出队时,如果上个节点刚好 cancel (ws > 0) 了,或者 CAS 失败,则将当前节点直接唤醒,这个优化是为什么?
  2. 本文没有提到 ConditionsignalAll() 方法,调用这个方法后,会发生什么?
展开阅读全文
打赏
0
4 收藏
分享
加载中
更多评论
打赏
0 评论
4 收藏
0
分享
返回顶部
顶部