文档章节

AQS-condition阻塞队列 [自定义同步器框架]

重城重楼
 重城重楼
发布于 05/08 14:28
字数 2127
阅读 5
收藏 1

1、AQS框架有同步队列和阻塞队列
    同步队列前文已有叙述,阻塞队列,本次将基于CyclicBarrier梳理讲解,会比较易读

2、CyclicBarrier有四个关键操作lock.lock;trip.await;trip.signalAll;lock.unlock
    据此条线看一下condition阻塞队列是如何发生作用的
    2-1、lock.lock条线   
    2-1-1、lock是直接使用的ReentrantLock,借其源码一读
        public void lock() {
            sync.lock();
        }
    2-1-2、sync是ReentrantLock的静态内部类,属装饰模式下进行的包装。
         具体区分NonfairSync和FairSync公平和非公平锁。
        public ReentrantLock() {//从构造器来看,默认采用的是非公平锁
            sync = new NonfairSync();
        }
    2-1-3、抓住lock线索,进入nonfairSync看一下其实现
    static final class NonfairSync extends Sync {
            final void lock() {
                if (compareAndSetState(0, 1))//第一次成功,后续走acquire排队获取
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);//会先行调用tryAcquire方法,其余逻辑同AQS自定义同步框架所述
            }

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);//实现在父类Sync中
            }
        }
2-1-4、Sync的nonfairTryAcquire方法,查看如何获取资源
    abstract static class Sync extends AbstractQueuedSynchronizer {
            abstract void lock();

            /**Performs non-fair tryLock.  tryAcquire isimplemented in subclasses*/
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();//持锁线程
                int c = getState();
                if (c == 0) {//有资源
                    if (compareAndSetState(0, acquires)) {//更新state->排它锁
                    setExclusiveOwnerThread(current);//设置自己为持锁线程
                    return true;
                    }
                } else if (current == getExclusiveOwnerThread()) {
                        //允许重入锁,如CyclicBarrier的isBroken()方法
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;//非首次非重入,将排队等待获取资源
            }

        //lock.unlock是将会进入此节点,标记为:ysma-unlock
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;//累减,兼容可重入锁情况
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {//完全退出,释放资源
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);//可重入锁资源释放
                return free;
            }

            final ConditionObject newCondition() {//阻塞队列需实现此方法
                return new ConditionObject();
            }
        }
小结:CyclicBarrier中lock.lock()的实现是AQS同步sync队列争抢资源
        那么将会是第一个进入的线程Thread1抢到资源并持锁进入,其余的线程ThreadX将会阻塞等待


2-2、trip.await条线 
    2-2-1、如上lock方法仅Thread1持锁进入,执行trip.await方法后阻塞,
        ThreadX等线程将会阻塞在lock处,排队等待获取资源
            由CyclicBarrier得知Condition trip = lock.newCondition();
            Condition由AbstractQueuedSynchronizer#ConditionObject实现,部分相关源码如下
        public class ConditionObject implements Condition, java.io.Serializable {
   
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;

            /**Adds a new waiter to wait queue. 注意:阻塞队列没有前驱节点的设置*/
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;//初始化-简单粗暴
                else
                    t.nextWaiter = node;//挂到lastWaiter后面
                lastWaiter = node;
                return node;//waitStatus为CONDITION状态的节点
                }

            /**Removes and transfers all nodes*/
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                //ysma-transferForSignal 标记此处:将节点从阻塞队列移往sync队列
                    first = next;
                } while (first != null);
            }

            /**变换condition阻塞为sync阻塞*/
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }

            /** Mode meaning to reinterrupt on exit from wait */
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1;

            /**为本次重点-这个要看的*/
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();//创建CONDITION塞节点
            //savedState结合acquireQueued方法可解读为将要获取的资源数
                int savedState = fullyRelease(node);//标记ysma-fullyRelease
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {//不在sync队列中则阻塞,标记ysma-isOnSyncQueue
                    LockSupport.park(this);//park 阻塞到这里了
                /**标记ysma-checkInterruptWhileWaiting =>检查等待过程中是否发生了异常
                *  发生了异常要转移node到sync队列
                */

                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                }//退出while循环时node已在sync队列中了,详见checkInterruptWhileWaiting和isOnSyncQueue
            
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                /**这里sync队列是忽略中断的,所以上文被唤醒后执行checkInterruptWhileWaiting发现
                *线程是已经被中断了的,by:acquireQueued返回线程是否中断
                *但是中断了也得给老子锁上,
                *已在sync队列就是THROW_IE,还需要排队等待获取资源就是REINTERRUPT
                *REINTERRUPT时是需要再次入sync队列的,最后在加入sync队列后
                *在reportInterruptAfterWait(interruptMode)再次抛出一个异常
                *来修改sync队列中对应node的状态
                *
comment1
                */
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);//标记ysma-reportInterruptAfterWait
            }
        }
    2-2-2、await方法执行LockSupport.park(this)阻塞前先行执行了fullyRelease    和isOnSyncQueue
        下面我们进入fullyRelease一观,看看阻塞前的操作流程
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();//state=>资源数
                if (release(savedState)) {//释放资源
                    failed = false;
                    return savedState;//返回savedState[释放的资源数]
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)//呼应unlinkCancelledWaiters();
                    node.waitStatus = Node.CANCELLED;
            }
        }
        紧接着看一下release释放什么状态
        public final boolean release(int arg) {
            if (tryRelease(arg)) {//释放资源,参见Sync源码标记处:ysma-unlock
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);//唤醒sync队列头节点的后继节点
                return true;
            }
            return false;
        }
        先不急,让我们在看一眼isOnSyncQueue做了什么在做个小结
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;//CONDITION状态的节点肯定不在sync队列中
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);//见下
        }
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {//sync队列中自尾到头找询node
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
        小结:A:由上可见Thread1持锁进入,ThreadX阻塞等待;
                B:然后加入Thread1到阻塞队列,释放资源给ThreadX,使得ThreadX可以获得资源,移除sync队列;
                C:Thread1直到再次从阻塞队列进入sync同步队列前持续阻塞。
                C:ABC循环直到将所有线程从sync队里转移到阻塞队列。CyclicBarrier到达临界点


2-3、trip.signalAll条线
        当所有线程都从sync队列转移到condition阻塞队列后,不能一直阻塞,这里以signalAll为例进行梳理
        详见2-2-1、ConditionObject#signalAll,进而由doSignalAll从firstWaiter到lastWaiter
            依次把CONDITION节点从condition阻塞队列移除并挂到sync队列中
        这里我们关注下标记:ysma-transferForSignal 看一下node是如何转挂到sync队列上的
        AbstractQueuedSynchronizer#transferForSignal
        final boolean transferForSignal(Node node) {
            /*If cannot change waitStatus, the node has been cancelled.*/
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;

            /*入sync队列,自旋到成功为止详见前文AQS自定义同步框架*/
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                //入队列后,线程取消或者更新状态失败,唤醒对应的线程,进行处理
                LockSupport.unpark(node.thread);
            return true;
        }
        小结:signal就是只处理firstWaiter,signalAll就是从firstWaiter到lastWaiter挨个处理一遍
            基本步骤就是从condition队列中把节点移除,然后把节点运输到sync队列上


2-4、lock.unlock条线
        sync同步队列自行运转,会依次释放信号量唤醒线程,那么就剩下最后的事情了,释放锁/资源
        public void unlock() {
            sync.release(1);//AbstractQueuedSynchronizer#release 如下
        }
        public final boolean release(int arg) {
            if (tryRelease(arg)) {//详见上文Sync#tryRelease
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        总结:从CyclicBarrier可以看出,整个condition的运用可分为三部分
            1、利用sync队列控制资源
            2、将线程节点从sync队列转移到condition队列,释放资源但是仍阻塞持锁人/线程
            3、直到条件合适,将线程节点移回sync队列,正常竞争资源直到退出
            4、in sync=>out sync=>in condition=>out condition=>in sync=>out sync


        PS:
        标记备注1:ysma-checkInterruptWhileWaiting
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                /**线程在等待期间中断了,transferAfterCancelledWait
                *成功就是THROW_IE否则就是REINTERRUPT 作用见:
comment1*/
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                /**修改waitStatus变更condition节点为sync节点*/
                enq(node);//入sync队列
                return true;
            }
            /*If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just spin.
             * 如果CAS更新失败,那么一定有别个已经在操作了,此时只需要耐心自旋,等待node挂到sync队列上即可
             */

            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
        标记备注2:ysma-reportInterruptAfterWait
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
            /**condition队列里面的中断导致其提前进入sync队列,
            *需要再次发送中断信号,以触发sync同步流程的中断处理机制
            */

        }


 

© 著作权归作者所有

重城重楼
粉丝 4
博文 59
码字总数 45991
作品 0
南京
程序员
私信 提问
Java并发编程-看懂AQS的前世今生

  在具备了volatile、CAS和模板方法设计模式的知识之后,我们可以来深入学习下AbstractQueuedSynchronizer(AQS),本文主要想从AQS的产生背景、设计和结构、源代码实现及AQS应用这4个方面...

ling
2018/08/12
0
0
多线程-AQS-CyclicBarrier

1、CyclicBarrier和CountDownLatch的区别 CountDownLatch是闭锁,只能使用一次,而CyclicBarrier的计数器会重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景; CyclicBarrier还提...

重城重楼
05/08
26
0
JUC源码分析—AQS

1. 概述 AbstractQueuedSynchronizer,简称AQS。是java里的一个抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的、、等。 AQS支持独...

泰迪的bagwell
2017/12/09
0
0
Java并发同步器AQS(AbstractQueuedSynchronizer)学习笔记(1)

Java中的并发包,是在Java代码中并发程序的热门话题。如果我们去读concurrent包的源码时,会发现其真正的核心是 AbstractQueuedSynchronizer , 简称 AQS 框架 , 而 Doug Lea 大神正是此包的...

zavakid
2012/10/24
3.2K
0
AQS(AbstractQueuedSynchronizer)同步器的底层解析

AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch等等是用来构建锁或者其他同步组件的基础框架,它使用了一个vol...

上帝爱吃苹果-Soochow
2018/08/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

聊聊nacos的LocalConfigInfoProcessor

序 本文主要研究一下nacos的LocalConfigInfoProcessor LocalConfigInfoProcessor nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/config/impl/LocalConfigInfoProcessor.java p......

go4it
昨天
5
0
前端技术之:webpack热模块替换(HMR)

第一步:安装HMR中间件: npm install --save-dev webpack-hot-middleware 第二步:webpack配置中引入webpack对象 const webpack = require('webpack’); 第三步:增加devServer配置项: ho......

popgis
昨天
5
0
死磕 java线程系列之线程池深入解析——体系结构

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 Java的线程池是块硬骨头,对线程池的源码做深入研究不仅能提高对Java整个并发编程的理解,也能提高自己...

彤哥读源码
昨天
6
0
虚函数表 图解

虚函数表 图解 p504

天王盖地虎626
昨天
5
0
java反射

学习目标  什么是反射  反射运行原理  了解反射机制的相关类  获取 class 对象的 3 种方式  通过反射获取构造方法并使用  通过反射获取成员变量并调用  通过反射获取成员方法并...

流川偑
昨天
5
2

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部