文档章节

难以理解的AQS(下)

o
 osc_w9s1w4o0
发布于 2019/04/01 09:44
字数 2220
阅读 0
收藏 0

精选30+云产品,助力企业轻松上云!>>>

在上一篇博客,简单的说下了AQS的基本概念,核心源码解析,但是还有一部分内容没有涉及到,就是AQS对条件变量的支持,这篇博客将着重介绍这方面的内容。

条件变量

基本应用

我们先通过模拟一个消费者/生产者模型来看下条件变量的基本应用:

  • 当有数据的时候,生产者停止生产数据,通知消费者消费数据;
  • 当没有数据的时候,消费者停止消费数据,通知生产者生产数据;
public class CommonResource {
    private boolean isHaveData = false;

    Lock lock = new ReentrantLock();

    Condition producer_con = lock.newCondition();
    Condition consumer_con = lock.newCondition();

    public void product() {
        lock.lock();
        try {
            while (isHaveData) {
                try {
                    System.out.println("还有数据,等待消费数据");
                    producer_con.await();
                } catch (InterruptedException e) {
                }
            }
            System.out.println("生产者生产数据了");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            isHaveData = true;
            consumer_con.signal();
        } finally {
            lock.unlock();
        }
    }

    public void consume() {
        lock.lock();
        try {
            while (!isHaveData) {
                try {
                    System.out.println("没有数据了,等待生产者消费数据");
                    consumer_con.await();
                } catch (InterruptedException e) {
                }
            }
            System.out.println("消费者消费数据");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            isHaveData = false;
            producer_con.signal();
        } finally {
            lock.unlock();
        }
    }
}
public class Main {
    public static void main(String[] args) {
        CommonResource resource = new CommonResource();
        new Thread(() -> {
            while (true) {
                resource.product();
            }
        }).start();

        new Thread(() -> {
            while (true) {
                resource.consume();
            }
        }).start();
    }
}

运行结果: image.png

这就是条件变量的应用,第一反应是不是和object中的wait/nofity很像,wait/nofity是配合synchronized工作的,而条件变量的await/signal是配合使用AQS实现的锁 来完成工作的,当然也要看用AQS实现的锁是否支持了条件变量。synchronized只能与一个共享变量进行工作,而AQS实现的锁支持多个条件变量。

我们试着分析下上面的代码:

首先创建了两个条件变量,一个条件变量用来阻塞/唤醒消费者线程,一个条件变量用来阻塞/唤醒生产者线程。

生产者,首先获取了独占锁,判断是否有数据:

  • 如果有数据,则调用条件变量producer_con的await方法,阻塞当前线程,当消费者线程再次调用该条件变量producer_con的signal方法,就会唤醒该线程。
  • 如果没有数据,则生产数据,并且调用条件变量consumer_con的signal方法,唤醒因为调用consumer_con的await方法而被阻塞的消费者线程。

最终释放锁。

消费者,首先获取了独占锁,判断是否有数据:

  • 如果没有数据,则调用条件变量consumer_con的await方法,阻塞当前线程,当生产者线程再次调用该条件变量consumer_con的signal方法,就会唤醒该线程。
  • 如果有数据,则消费数据,并且调用条件变量producer_con的signal方法,唤醒因为调用producer_con的await方法而被阻塞的生产者线程。

最终释放锁。

这里有一点需要特别注意:

  • 释放锁,一般应该放在finally里面,以防中间出现异常,锁没有被释放。

为了加深对条件变量的理解,我们再来看一个例子,两个线程交替打印奇偶数:

public class Test {
    private int num = 0;
    private Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    public void add() {
        while(num<100) {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + ":" + num++);
                condition.signal();
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}
public class Main {
    public static void main(String[] args) {
        Test test=new Test();
        new Thread(() -> {
            test.add();
        }).start();

        new Thread(() -> {
            test.add();
        }).start();
    }
}

运行结果: image.png

翻阅网上的大多数案例是分两个线程方法交替打印,同时开两个条件变量,其中一个条件变量负责阻塞/唤醒打印奇数的线程,一个变量负责阻塞/唤醒打印偶数的线程,但是个人觉得没什么必要,两个线程共用一个线程方法,共用一个条件变量也可以。不知道各位看官是什么想的?

源码解析

当我们点开lock.newCondition,发现它有好几个实现类: image.png 我们选择ReentrantLock的实现类,实际上其他实现类也是相同的,只是为了和上面案例中的对应起来,所以先选择ReentrantLock的实现类:

   public Condition newCondition() {
        return sync.newCondition();
    }

继续往下点:

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

可以看到,当我们调用lock.newnewCondition,最终会new出一个ConditionObject对象,而ConditionObject类是AbstractQueuedSynchronizer的内部类,我们先看下ConditionObject的UML图: image.png 其中firstWaiter保存的是该条件变量下条件队列的首节点,lastWaiter保存的是该条件变量下条件队列的尾节点。这里只保存了条件队列的首节点和尾节点,中间的节点保存在哪里呢? 让我们点开await方法:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

在这里,我们就搞清楚三个问题即可:

  • 完整的条件队列保存在哪里,以什么方式保存?
  • await方法,是如何释放锁的?
  • await方法,是如何阻塞线程的?

第一个问题在addConditionWaiter方法可以得到答案:

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

首先是判断条件队列中的尾节点是否被取消了,如果被取消了,执行unlinkCancelledWaiters方法。我们这里肯定没有被取消,事实上,如果是第一次调用await方法,lastWaiter是为空的,所以肯定不会进入第一个if。随后,新建一个Node,这个Node类就是上一篇博客中大量介绍过的,也是AbstractQueuedSynchronizer的内部类,也就是新建了一个Node节点,其中保存了当前线程和Node的类型,这里Node的类型是CONDITION,如果t==null,则说明新建的Node是第一个节点,所以赋值给firstWaiter ,否则将尾节点的nextWaiter设置为新Node,形成一个单向链表,这个nextWaiter在哪里呢,它是通过node点出来的,也就是它也属于node类的一个字段: image.png 这说明了一个比较重要的问题: AQS的阻塞队列是以双向的链表的形式保存的,是通过prev和next建立起关系的,但是AQS中的条件队列是以单向链表的形式保存的,是通过nextWaiter建立起关系的,也就是AQS的阻塞队列和AQS中的条件队列并非同一个队列。

第一个问题解决了,我们再来看第二个问题,第二个问题答案在await的第二个方法:

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

首先调用getState方法,这个state是什么,不知大家是否还有印象,对于ReentrantLock来说,state就是重入次数,随后调用release方法,传入state。也就是不管重入了多少次,这里是一次性把锁完全释放掉。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

可以看到释放锁还是调用了tryRelease方法,这个方法正是需要被重写的。

当完成了前两个方法的调用后,就会进行一个判断isOnSyncQueue,一般来说会进入这个if,park这个线程,等待唤醒,这就解决了第三个问题。

下面我们再来看看signal方法,同样的,我们需要解决几个问题:

  • AQS的条件队列和阻塞队列既然不是同一个队列,那么是不是被await的线程永远不会进入阻塞队列?
  • signal方法是如何唤醒线程的?
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

重点在于doSignal中的transferForSignal方法:

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;

在这个方法中,我们会调用enq方法,把条件队列的线程放入阻塞队列中,然后调用unpark方法,唤醒线程。

本篇博客到这里也结束了。

经过上下两篇博客,相信大家对AQS一定有了一个比较浅显的理解。聪明的你,可以看出来,其实这两篇博客有很多内容都没有讲透,甚至有点模棱两可,只是“蜻蜓点水”,所以这也符合了我的标题:难以理解的AQS。的确,AQS要深入研究的话,不比线程池简单多少。看,我又再给自己找理由了。希望经过今后的沉淀,我可以把这两篇博客重写下,然后换个标题“彻底理解AQS”,嘿嘿。

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
难以理解的AQS(上)

在一篇博客中,我们看了下CopyOnWriteArrayList的源码,不是很难,里面用到了一个可重入的排他锁: ReentrantLock,这东西看上去和Synchronized差不多,但是和Synchronized是完全不同的东西。...

osc_a22drz29
2019/03/26
4
0
难以理解的AQS(下)

在上一篇博客,简单的说下了AQS的基本概念,核心源码解析,但是还有一部分内容没有涉及到,就是AQS对条件变量的支持,这篇博客将着重介绍这方面的内容。 条件变量 基本应用 我们先通过模拟一...

CodeBear
2019/04/01
0
0
Java多线程——AQS框架源码阅读

AQS,全称,是Concurrent包锁的核心,没有AQS就没有Java的Concurrent包。它到底是个什么,我们来看看源码的第一段注解是怎么说明<!-- more -->看完第一段,总结下 AQS是一个同步的基础框架,...

osc_04fmvbv4
2018/11/19
1
0
Java多线程——AQS框架源码阅读

AQS,全称AbstractQueuedSynchronizer,是Concurrent包锁的核心,没有AQS就没有Java的Concurrent包。 它到底是个什么,我们来看看源码的第一段注解是怎么说明 看完第一段,总结下 AQS是一个同...

Zack说码
2018/08/24
0
0
Java多线程——AQS框架源码阅读

AQS,全称,是Concurrent包锁的核心,没有AQS就没有Java的Concurrent包。它到底是个什么,我们来看看源码的第一段注解是怎么说明 看完第一段,总结下 AQS是一个同步的基础框架,基于一个先进...

公众号_Zack说码
2018/10/09
32
0

没有更多内容

加载失败,请刷新页面

加载更多

百度技术沙龙第67期 百度开源专场

本文作者:HelloDeveloper 具体的产品案例,分享百度开源技术最新实践经验。目前这些项目都已经在 github/baidu 上开源。 什么是 PaddlePaddle 深度学习平台? 首先做个简单的介绍,PaddleP...

百度开发者中心
2019/07/23
11
0
Hacker News 简讯 2020-07-10

更新时间: 2020-07-10 01:15 US Supreme Court deems half of Oklahoma a Native American Reservation - (reuters.com) 美国最高法院认为俄克拉荷马州的一半是印第安人保留地 得分:131 | 评...

FalconChen
今天
28
0
OSChina 周五乱弹 —— 求求你吃了我吧,不要再玩弄食物的感情了

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @巴拉迪维 :张喆的单曲《陷阱 》 这首歌已经在网易找不到原唱了,不知道被哪家买了版权。#今日歌曲推荐# 《陷阱 》- 张喆 手机党少年们想听歌...

小小编辑
今天
32
1
清华陈文光教授:AI 超算基准测试的最新探索和实践。

道翰天琼认知智能平台为您揭秘新一代人工智能。 无规矩不成方圆。放在超级计算机的研发领域,没有一个大家普遍接受的算力评测指标,便难以推动超算迅猛发展。 而现在伴随着人工智能的发展,大...

jackli2020
今天
7
0
@RequestMapping, consumes 提交简单有意思的测试

getParm @GetMapping("getParm")public Result getParm(String id){ System.out.println(); return ResultFactory.success(id);} 等同于 == bodyParm @PostMapping("bodyParm......

莫库什勒
今天
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部