Reentrantlock源码--深入理解AQS

原创
2020/03/17 15:10
阅读数 843

前言:本文主要想解决的问题

  1. 为什么要用Reentrantlock?
  2. 了解Reentrantlock源码(重点)
  3. 通过Reentranctlock了解AQS原理,并且能够自定义锁
  4. 通过Reentranctlock源码,帮助理解一些概念,比如 公平锁/非公平锁  可重入锁  自旋锁

1 为什么要用Reentrantlock?

在这之前,需要了解一下,什么是自旋锁,上一段伪代码

很简单的一个自旋锁,通过CAS无锁化机制实现的,只要获取不到资源,不停的循环,直到T1线程释放那么自旋的缺点是什么呢? 耗费cpu资源,没有竞争到锁的资源一直空转,如何解决呢?

  • 自旋+yield()

如果获取不到资源,让掉当前线程 CPU 的时间片 ,但是,让出并不代表不竞争,只是暂时放掉了,对于cpu的消耗依然不小,显然不是一个好方法

自旋+sleep()

看似好像可以解决问题,但是sleep的时间无法精准的控制,假设T1仅0.1秒就执行完成,T2 sleep了10秒,这就浪费了9.9s,显然这种方法也不好

那么有没有什么方法能够精准的控制时间呢,比如T1执行完成,T2就能立马的获取这个锁,这样既避免了cpu的消耗,又不至于sleep导致的时间浪费,如下代码

  • park+自旋(注意:演示的是伪代码,只要会意即可,不要太过纠结)

  1. 如何理解park呢,我觉得可以理解成sleep,只是park可以由unpark()来唤醒,注意 park 和 unpark是 Unsafe后门类的重要方法,实现Reentrantlock必须的方法
  2. 如果加锁失败--执行park方法,把当前线程加入阻塞队列,同时释放cpu
  3. 解锁过程,从队列头唤醒,然后再次执行循环,竞争(唤醒的都是头
  4. 公平锁/非公平锁:从阻塞队列头开始唤醒,就是公平锁,随机唤醒的就是非公平锁

想下,如果按照park+自旋的方式,自旋所带来的问题能够彻底解决掉吗,其实可以,没有竞争到的线程被放入了阻塞队列,等待T1线程执行完成后唤醒,而sleep是一个被动的过程,没有人去唤醒,这就是最大的区别,Reentrantlock的逻辑与这种方式类似,先把这种思路和大家讲清楚,那么Reentrantlock的思路就很清晰了,下面开始分析Reentrantlock源码

2 Reentrantlock源码解析

先来看下Reentrantlock的继承结构,以及内部维护的元素,Reentrantlock,实现了lock,Reentrantlock内部维护了一个Sync,Sync就是AbstractQueuedSynchronizer(传说中的AQS)

AQS,用prev next 来表示前后节点,用head tail来维护队列头 队列尾,可以得到AQS的模型,下图

主要是从lock unlock 这两个方法来看ReentrantLock的加锁,解锁过程

ReentrantLock加锁过程就是在调用AQS的acquire(),整体来看下来这段逻辑,十分精炼,主要分为3段逻辑

  1.  尝试加锁
  2. 加锁失败,就要进入队列
  3. 入队之后,就要看是否能够唤起unpark,竞争锁

假设有三个线程 T1 T2 T3 ,T1最早 T2次之 T3最迟

  • 1 尝试加锁

 

 public final void acquire(int arg) {
        //尝试获取锁
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }    

 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //c==0  表示当锁无人占用
            if (c == 0) {
                //判断是否需要排队,如果不需要排队,那么占有锁
                //这里可以体现公平锁和非公平锁,这是公平锁的逻辑,所以要排对,而非公平锁的逻辑是,只要当前c==0,就尝试拿这把锁,无需排队,第一个线程T1会进入
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //设置node节点中的线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //这个判断其实是 重入锁,nextc又加了一遍
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    //排队逻辑,只有返回false,不需要排队,才能占有锁,说白了就是看前面有没有node
    public final boolean hasQueuedPredecessors() {      
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        //当第一个线程来的时候,肯定不需要排队,此时队列都没有,线程直接获取锁了,返回,无需初始画队列
        //当第二个线程来的时候,依然不用排队,此时也没有队列,获取锁失败之后,会初始化队列
        //当第三个线程来的时候,此时有了队列
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

总结:

  1. T1直接获取锁,无需入队,此时没有队列的概念,直到T2,才会去构建队列

  2. 理解重入锁的概念,被加了两次锁,c被加了2次,释放锁的时候,同样会减掉2次

  3. 理解公平锁,锁无人占用的时候,不是直接去获取锁,而是先看要不要排队,所以公平,如果无人占用,直接获取锁,那么队排队等待的节点就不公平

  • 2 创建队列,维护好AQS队列

 

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    // 走到这个方法,说明,当前没有获取到锁,开始排队
    // 1 如果当前没有初始化队列,走enq(final Node node)方法,因为是死循环,第一次会初始化一个null虚拟节点出来 图一,第二次会把当前Node,挂载到虚拟节点后面 图二
    // 2 如果已经初始化了,就把当前节点加到队尾 
    private Node addWaiter(Node mode) {
        //先把当前线程包装成为Node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //这里很奇怪,为什么一定要用pred来判断呢,因为追加的时候是 pred.next = node; 这样就加到队尾了
        if (pred != null) {
            node.prev = pred;
            //return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        //初始化逻辑
        enq(node);
        return node;
    }

   //初始化逻辑,构建虚拟节点
   private Node enq(final Node node) {
        for (;;) {
            //t 一定是null,还没有初始化,tail是null
            Node t = tail;
            if (t == null) { // Must initialize
                //这里初始化的是队列头
                // return unsafe.compareAndSwapObject(this, headOffset, null, update);

                if (compareAndSetHead(new Node()))
                    // 一个元素,队头也是队尾
                    tail = head;
            } else {
                node.prev = t;

                // return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

   

总结

  1. 维护了一个aqs队列,上面两张图,很重要
  2. 构建队列之后,返回当前的Node,只返回一个Node
  3. 但是,为什么要设置一个虚拟节点呢???后面会解答

3 判断当前是否能够竞争锁

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 这是一个死循环,如果没有获取到锁的话,竞争一次,再次失败,就被park了
            for (;;) {
                final Node p = node.predecessor();
                //判断上一个node是不是head,即自己是不是第二个,因为第一个始终是null的Node
                if (p == head && tryAcquire(arg)) {
                    //如果获取到锁,把当前自己的node作为头,把上一个给移除掉,看下面的三张图
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断是否需要park,如果需要的话,就park
                //这里有两种情况 1 不在第二个 2 在第二个,但是没有获取到锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

 
    //每一个node节点都会有一个ws状态
    //该方法语意,如果当前线程不在第二个节点 或者 没有获取到锁(两种情况都会进入该方法),就判断该线程是否需要park,false表示不需要park,可以在死循环中再判断一次,true直接park
    //不在第二个节点的情况:park
    //在第二个节点,但是没有获取到锁的情况:ws=0--》ws被改成-1》返回false--》表示不需要park,还可以在竞争一次--》再次失败,再进入该方法,判断是否要park--》ws=-1,返回true,直接park掉
    //上面做了三件事 1 park掉了当前节点  2 把上一个节点ws改成-1  3 自己竞争了2次
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       
       int ws = pred.waitStatus;
        
       //static final int SIGNAL    = -1;
       if (ws == Node.SIGNAL)
           
            return true;
        if (ws > 0) {
            
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            
            // 满足期待值,就把他改为-1
            //unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

总结:

  • ws的流转过程

加锁过程:取上一个节点的ws(一般为0),最终被下一个节点改成-1 ,下一个节点的ws=0  pred.ws=-1     now.ws=0

解锁过程:头节点不为null(有AQS队列) 且  头结点的ws!=0(头结点有下一个节点,因为ws=0表示没有下一个修改过它),那么就唤起下一个(unpark),第一个永远是null节点

  • 为什么要把上一个节点的ws改成-1呢?
 Node h = head
 if (h != null && h.waitStatus != 0)

因为需要唤起第二个,而我怎么知道有第二个呢?头结点不为0即可,为0表示没有下一个修改过它,所以通过这套规则就可以找到需要被唤醒的第二个

  • 为什么始终有个null节点在头部(如上三幅图)
  1. 在初始化的时候,会构造一个null节点放在头部
  2. 当有线程占有锁的时候,会把上一个为null的节点移除,同时将自己置为null,作为新的头,而此时,自己已经上位了,获取到了锁
  3. 所以始终会有一个null节点放在队列头
  • 竞争次数的问题

  1. 不在第二个,进到shouldParkAfterFailedAcquire方法,ws改为-1 返回false,在循环一次,还不满足条件,在进shouldParkAfterFailedAcquire,返回true,被park,一共2次
  2. 在第二个,但是没有获取到锁,进到shouldParkAfterFailedAcquire方法,ws改为-1 返回false,在竞争一次,还是竞争失败,也会被park,也是2次
  3. 当占有锁的线程执行完成,会来唤醒第2中已经park了的Node,从park的地方唤醒,会再次竞争锁,此时锁无人占用,竞争到了锁,所以,从头到尾,最多竞争3次

4 解锁过程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            
            // 释放锁成功,就要唤醒队列中的第二个
            // 这个判断很有趣 
            // h!=null 表示有AQS队列,如果没有的话,啥也不用干了,也没人需要占有锁了
            // h.waitStatus != 0 这个点在上一个总结中已经讲的很清楚了,就是为了找到队列中的第二个,因为公平锁,唤醒的肯定是队列中的第二个
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    // 尝试解锁,成功返回true,表示当前锁自由了
    protected final boolean tryRelease(int releases) {
            //减掉releases
            int c = getState() - releases;
            //一般不会发生,占有锁的线程不是当前线程,除非自己乱搞,用另外一个reentrantlock去解锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //只有c==0,表示锁可以释放
            //但是要注意,如果是重入锁的话,虽然减掉了releases,依然不为0,关键:加几次就要减几次
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
    }

   // 真正的唤醒逻辑
    private void unparkSuccessor(Node node) {
        
        //此时的ws是之前占有锁的node的ws
        int ws = node.waitStatus;
        if (ws < 0)
            //在把它变为0
            compareAndSetWaitStatus(node, ws, 0);

        // 取下一个node
        Node s = node.next;

        //这里是取消状态 线程被取消 ws>0 会从队尾开始找,找到一个可以唤起的节点(如果取消的下一个没有问题,就是下一个,如果是下下个没问题,那就是下下个),把下一个赋值给t,最后唤起满足条件的
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 下一个node不为空,unpark唤醒起来
        // 这里需要思考下,unpark之后,第二个节点会从哪里开始运行呢,从哪里park,就从哪里unpark
        // 唤醒之后,再次进入之前的死循环,最后在竞争锁,最后竞争成功
        if (s != null)
            LockSupport.unpark(s.thread);
    }


    //回溯到加锁失败park的地方
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 唤醒之后,再次竞争,所以最多竞争三次
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 正常来说不会取消排队,只有真正发生异常了,才会进入,这里可以不用关注
            if (failed)
                cancelAcquire(node);
        }
    }

   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        
        //这里的返回取决与是否调用过interrupt方法,掉了,表示中断过,返回true,没有调用过,表示没有中断过,返回false,不过要注意,interrupted() 很坑,判断是否被中断过,但是判断完成之后,会立马把这个值变成false
        //比如:线程有被打断过Thread.interrupted()返回true,然后这个线程就会立马设置为false,拿到true之后,立马又变成了false,所以selfInterrupt()方法对于打断过的方法,要再次打断下
        return Thread.interrupted();
    }

    // 最后回到加锁的主方法上,前两个判断已经执行了,之前被park的线程也已经被唤醒了,去执行最后的selfInterrupt方法
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

   static void selfInterrupt() {
        // 自己打断
        Thread.currentThread().interrupt();
    }

  • 总结

  1. interrupt   其作用是中断此线程,其实不是真正中断线程,就是把线程中的标志位改为true,线程并不会被中断

  2. interrupted 测试当前线程是否中断,清除标识位,这个方法很坑,慎用,比如线程被中断之后,该方法会返回true,但是会立马清除标志位,本来是true,清除之后,就是boolean的默认值,变成了false,真的要想看线程是否中断过,用isInterrupted
  3. isInterrupted 测试当前线程是否中断,不会清除标识位
  • 5 反思

  1. 我们可以纵观ReentrantLock的加锁解锁过程,可以发现,锁释放之后,唤起队列中的第二个节点,这和1中 自旋+park是一致的
  2. park 和 unpark 都是 nusafe提供的,这是ReentrantLock中很关键的两个方法,若没有这两个方法,是无法实现ReentrantLock的

  3. 以上的所有源码,其实都是AQS提供的,ReentrantLock其实是在调用AQS的方法

  4. 其实我觉得AQS已经定义的非常完美了,不需要覆盖AQS的方法重写自己的锁了,但是为了演示下,可以看我另一篇博文,通过AQS自定义了锁   https://my.oschina.net/u/3523594/blog/2236923

如有错误,请指正,发帖画图不易,如需转载,请标明出处   best wish ......

 

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部