文档章节

Java并发编程之Condition源码分析

什么都会一点
 什么都会一点
发布于 2018/12/31 19:41
字数 2704
阅读 169
收藏 5

Condition介绍

上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充。ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活"。Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现。

常用方法

Condition中主要的方法有2个

  • (1)await()方法可以阻塞当前线程,并释放锁。
  • (2)在获取锁后可以调用signal()通知被await()阻塞的线程"激活"。

这里的await(),signal()必须在ReentrantLock#lock()和ReentrantLock#unlock()之间调用。

Condition实现分析

Condition的实现也是利用AbstractQueuedSynchronizer队列来实现,await()在被调用后先将当前线程加入到等待队列中,然后释放锁,最后阻塞当前线程。signal()在被调用后会先获取等待队列中第一个节点,并将这个节点转化成ReentrantLock中的节点并加入到同步阻塞队列的结尾,这样此节点的上个节点线程释放锁后会激活此节点线程取来获取锁。

await()方法源码分析

await()源码如下

        public final void await() throws InterruptedException {
		//判断是否当前线程是否被中断中断则抛出中断异常
            if (Thread.interrupted())
                throw new InterruptedException();
		//加入等待队列
            Node node = addConditionWaiter();
		//释放当前线程锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
		//判断是否在同步阻塞队列,如果不在一直循环到被加入
            while (!isOnSyncQueue(node)) {
		//阻塞当前线程
                LockSupport.park(this);
		//判断是否被中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
		//获取锁,如果获取中被中断则设置中断状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
		//清除等待队列中被"激活"的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
		//如果当前线程被中断,处理中断逻辑
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

主要分以下几步

  • (1)先判断是否当前线程是否被中断中断则抛出中断异常如果未中断调用addConditionWaiter()加入等待队列

  • (2)调用fullyRelease(node)释放锁使同步阻塞队列的下个节点线程能获取锁。

  • (3)调用isOnSyncQueue(node)判断是否在同步阻塞队列,这里的加入同步阻塞队列操作是在另一个线程调用signal()后加入,如果不在同步阻塞队列会进行阻塞直到被激活。

  • (4)如果被激活然后调用checkInterruptWhileWaiting(node)判断是否被中断并获取中断模式。

  • (5)继续调用isOnSyncQueue(node)判断是否在同步阻塞队列。

  • (6)是则调用acquireQueued(node, savedState) 获取锁,这里如果获取不到也会被阻塞,获取不到原因是在第一次调用isOnSyncQueue(node)前,可能另一个线程已经调用signal()后加入到同步阻塞队列,然后调用acquireQueued(node, savedState) 获取不到锁并阻塞。acquireQueued(node, savedState)也会返回当前线程是否被中断,如果被中断设置中断模式。

  • (7)在激活后调用unlinkCancelledWaiters()清理等待队列的已经被激活的节点。

  • (8)最后判断当前线程是否被中断,如果被中断则对中断线程做处理。

下面来看下addConditionWaiter()实现

 private Node addConditionWaiter() {
 		//获取等待队列尾部节点
            Node t = lastWaiter;
            //如果尾部状态不为CONDITION,如果已经被"激活",清理之,然后重新获取尾部节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
		//创建以当前线程为基础的节点,并将节点模式设置成CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
		//如果尾节点不存在,说明队列为空,将头节点设置成当前节点
            if (t == null)
                firstWaiter = node;
		//如果尾节点存在,将此节点设置成尾节点的下个节点
            else
                t.nextWaiter = node;
		//将尾节点设置成当前节点
            lastWaiter = node;
            return node;
        }

addConditionWaiter()的逻辑很简单,就是创建以当前线程为基础的节点并把节点加入等待队列的尾部待其他线程处理。

下面来看下fullyRelease(Node node)实现

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
		//获取阻塞队列中当前线程节点的锁状态值
            int savedState = getState();
		//释放当前线程节点锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
		//释放失败讲节点等待状态设置成关闭
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

调用getState()先获取阻塞队列中当前线程节点的锁状态值,这个值可能大于1表示多次重入,然后调用release(savedState)释放所有锁,如果释放成功返回锁状态值。

下面来看下isOnSyncQueue(Node node)实现

final boolean isOnSyncQueue(Node node) {
		//判断当前节点是否是CONDITION或者前置节点是否为空如果为空直接返回false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
		//如果下个节点存在,则在同步阻塞队列中返回true
        if (node.next != null) // If has successor, it must be on queue
            return true;
		//遍历查找当前节点是否在同步阻塞队列中
        return findNodeFromTail(node);
    }
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

此方法的功能是查找当前节点是否在同步阻塞队列中,方法先是快速判断,判断不了再进行遍历查找。

  • (1)第一步先判断次节点是否CONDITION状态或者前置节点是否存在,如果是表明不在队列中返回false,阻塞队列中的状态一般是0或者SIGNAL状态而且如果当前如果当前节点在队列阻塞中且未被激活前置节点一定不为空。
  • (2)第二步判断节点的下个节点是否存在,如果存在则表明当前当前节点已加入到阻塞队列中。
  • (3)如果以上2点都没法判断,也有可能刚刚加入到同步阻塞队列中,所以调用findNodeFromTail(Node node)做最后的遍历查找。查找从队列尾部开始查,从尾部开始查的原因是可能刚刚加入到同步阻塞队列中,从尾部能快速定位。

下面看下checkInterruptWhileWaiting(Node node)实现

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

此方法在线程被激活后被调用,主要功能就是判断被激活的线程是否被中断。此方法会返回2种中断状态THROW_IE和REINTERRUPT,THROW_IE是调用signal()前被中断返回,REINTERRUPT在调用signal()后被中断返回。 此方法先判断是否被标记中断,是的话再调用transferAfterCancelledWait(node)取判断是那种中断状态,transferAfterCancelledWait(node)方法分2步

  • (1)用CAS方式将节点状态改错等待状态改成CONDITION,并加入到同步阻塞队列中返回true
  • (2)如果不能加入到同步阻塞队列就自旋一直等待加入

如果使用await()方法上面2步其实是没什么作用其最后一定会返回false,因为await()被激活只能调用 signal()方法,而signal()方法肯定已经将节点加入到同步阻塞队列中。所以以上逻辑是给await(long time, TimeUnit unit)等带超时激活方法用的。

acquireQueued(node, savedState)方法再上一章节已经讲过这边就不重复了,下面分析下unlinkCancelledWaiters()方法

private void unlinkCancelledWaiters() {
		//获取等待队列头节点
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
		//获取下个节点
                Node next = t.nextWaiter;
		//如果状态不为CONDITION说明已经加入阻塞队列需要清理掉
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
			//获取下个节点
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

此方法就是从头开始查找状态不为CONDITION的节点并清理,状态不为CONDITION节点说明此节点已经加入到阻塞队列,已经不需要维护。

下面来看下reportInterruptAfterWait(interruptMode)方法

private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
		//如果是THROW_IE模式直接抛出异常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
		//如果是REINTERRUPT模式标记线程中断由上层处理中断
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

此方法处理中断逻辑。如果是THROW_IE模式直接抛出异常,如果是REINTERRUPT模式标记线程中断由上层处理中断。

signal()方法源码分析

signal()源码如下

public final void signal() {
		//是否当前线程持有锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
		//通知"激活"头节点线程
            if (first != null)
                doSignal(first);
        }

先调用isHeldExclusively()判断锁是否被当前线程持有,然后检查等待队列是否为空,不为空就是可以取第一个节点调用doSignal(first)去"激活",这里激活不是真正的激活而只是将节点加入到同步阻塞队列尾部,所以上下文中带""的激活都是这种解释。

下面看下isHeldExclusively()实现

 protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

实现就是比较下当前线程和持有锁的线程是否同一个

下面看下doSignal(first)的实现

        private void doSignal(Node first) {
            do {
		//头指头后移一位,如果后面的节点为空,则将尾指头也指向空,说明队列为空了
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
		//清空头节点的下个节点
                first.nextWaiter = null;
		//如果"激活"失败者取下个继续,直到成功或者遍历完
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

此方法就是取当前头节点一直去尝试"激活",直到成功或者遍历完。

下面来看下transferForSignal(first)方法

final boolean transferForSignal(Node node) {
		//将CONDITION状态设置成0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
		//加入到同步阻塞队列
        Node p = enq(node);
        int ws = p.waitStatus;
		//状态异常直接激活
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
  • (1)此方法先先将CONDITION状态设置成0,因为如果是CONDITION状态加入到同步阻塞队列,激活的时候是不识别的。
  • (2)加入到同步阻塞队列的尾部。所以同步阻塞队列中前面如果有多个在排队,调用unlock()不会马上激活此节点。
  • (3)状态异常直接调用unpark激活,这边按理说如果状态异常情况下激活,await()在调用unlock()被激活后会进行相应的异常处理,但看await()代码没有处理则是正常执行。

这个方法主要就是把节点加入到同步阻塞队列的,真正的激活则是调用unlock()去处理。

© 著作权归作者所有

共有 人打赏支持
什么都会一点

什么都会一点

粉丝 4
博文 9
码字总数 13222
作品 3
杭州
程序员
私信 提问
Java 并发编程源码解析汇总篇

java并发编程,内存模型 java并发编程,volatile内存实现和原理 Java并发编程,并发基础 Java 并发编程,线程池(ThreadPoolExecutor)源码解析 Java并发编程,Executor 框架介绍 Java并发编...

郑加威
2018/12/23
0
0
CountDownLatch源码解析

CountDownLatch 相比ReentranceLock,CountDownLatch的流程还是相对比较简单的,CountDownLatch也是基于AQS,它是AQS的共享功能的一个实现。 下面从源代码的实现上详解CountDownLatch。 1、C...

maxam0128
2018/01/23
0
0
Disruptor 3.3.11 和 3.4.2 发布,并发编程框架

Disruptor 是一个 Java 的并发编程框架,大大的简化了并发程序开发的难度,在性能上也比 Java 本身提供的一些并发包要好。本次更新主要是 bug 修复,具体如下: 3.4.2 Fix race condition i...

淡漠悠然
2018/03/23
962
7
Java程序员该如何突破瓶颈!

一、源码分析 源码分析是一种临界知识,掌握了这种临界知识,能不变应万变,源码分析对于很多人来说很枯燥,生涩难懂。 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心。 我认为...

JAVA高级架构开发
2018/08/31
0
0
java中高级大公司多线程面试题

1)在Java中Lock接口比synchronized块的优势是什么?你需要实现一个高效的缓存,它允许多个用户读,但只允许一个用户写,以此来保持它的完整性,你会怎样去实现它? lock接口在多线程和并发编...

java成功之路
2018/10/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

看过上百部片子的这个人教你视频标签算法解析

本文由云+社区发表 随着内容时代的来临,多媒体信息,特别是视频信息的分析和理解需求,如图像分类、图像打标签、视频处理等等,变得越发迫切。目前图像分类已经发展了多年,在一定条件下已经...

腾讯云加社区
15分钟前
0
0
2. 红黑树

定义:红黑树(Red-Black Tree,简称R-B Tree),它一种特殊的二叉查找树(Binary Search Tree)。 要理解红黑树,先要了解什么是二叉查找树。在上一章中,我们学习了什么是二叉树,以及二叉树...

火拳-艾斯
16分钟前
0
0
input的button类型,点击页面跳转

一、input type=button 不做任何操作 例如: <input type="button" class="btn btn-primary" style="width: 30%" value="返回" onclick="window.location.href='/users/list'"></input> onc......

Sunki
22分钟前
0
0
踩坑:js 小数运算出现精度问题

背景 在学习小程序商城源码时发现了这个问题,单价可能出现小数,小数之间运算结果会莫名其妙多出一大串数字,比如下面这样👇。 在此之前我是知道 js 中著名的 0.1 + 0.2 != 0.3 的问题的,...

dkvirus
27分钟前
0
0
zookeeper和HBASE总结

zookeeper快速上手 zookeeper的基本功能和应用场景 zookeeper的整体运行机制 zookeeper的数据存储机制 数据存储形式 zookeeper中对用户的数据采用kv形式存储 只是zk有点特别: key:是以路径...

瑞查德-Jack
50分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部