文档章节

JDK核心源码之ReenrantLocak源码注释

lixiyuan
 lixiyuan
发布于 2019/05/15 19:08
字数 4488
阅读 22
收藏 0

 

        AQS的java的同学面试经常被问到的一个问题。很多同学被面到这个问题的时候,一脸蒙圈。但是说实话,这个AQS对于java的同学来说应该是一个比较重要的知识,因为我们很多并发的对象都是基于这个实现的,所以考察java同学的并发知识的功底,问这个AQS也是一个质量比较好的问题。

         AQS的全称是AbstractQueuedSynchronizer(文中就叫:抽象队列),说白了其实就是一个抽象类。

是我们并发包里面的基石,重入锁,读写锁,很多并发的工具都是基于它实现的。所以理解好AQS是什么东西对于掌握好并发知识是有帮助的。

     不过,我们如果分析AQS的时候,直接就读AQS的代码有点无聊,我们就分析ReentantLock的源码,因为我们平时真正使用的时候我们使用的是ReentantLock,而不是AQS。但是ReentantLock就是基于AQS实现的。

     在分析AQS之前,我先给大家解析一下AQS的原理。如下图所示:

图展示的一个ReentantLocak(基于非公平锁分析)加锁的过程。

  1. 首先是线程一过来了,state的初始值是0,当前线程为null。因为从来没有人加过锁,所以线程一很容易就加锁成功了。加锁成功了就是把state0变为1,当前线程设置为自己的名字,比如就是线程1.
  2. 接下来,线程一再次进来进行重入加锁,发现当前的state1,但是当前线程是自己,那么直接就让state1,由1变为2,这样就完成了可重入加锁。
  3. 接着线程二进来了,线程二进来发现state2,但是当前线程不是自己,所以加锁就失败了。

加锁失败了以后线程二就加入到队列里面去(AQS内部实现了一个双向队列)

  1. 再接着线程三就进来了,遇到的状况跟线程二一样,所以跟着也就到了队列里面了。

 

AQS重入加锁大概就是这么个原理。我们接下来分析一下它的底层源码。我就以ReentantLocak为例写个例子。

public class ReentrantLockDemo {
    ReentrantLock reentrantLock = new ReentrantLock();
    int sum=0;
    public void count(){
//这个是加锁的代码
//我们此次主要基于非公平锁分析源码,等到合适的时机再给大家
//解释公平锁和非公平锁的区别。
//首先我们开始分析lock的方法。
        reentrantLock.lock();
        for (int i = 0; i < 10; i++) {
            sum++;
            System.out.println(sum);
        }
//这个是我们释放锁的代码
        reentrantLock.unlock();
    }
    }
}
final void lock() {
//线程一第一次进来之间使用cas操作修改state的值
//这句代码的语义就是当前state的值是否为0,如果是0,那么就把修改为1。
    if (compareAndSetState(0, 1))
//设置当前线程为自己,其实线程一第一次进来加锁的时候,到这儿就加锁成功了!!
        setExclusiveOwnerThread(Thread.currentThread());
    else
//如果线程1第二次进来
//那么因为state不是0了,所以会cas操作失败
//所以会走这个方法 
        acquire(1);
}

接下来我们分析一下,线程一第二次进来是如何加锁的:
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
我们慢慢分析这段代码
首先应该分析的是:tryAcquire(arg)方法

所以我们要想分析acquire方法,那么先分析里面的tryAcouire方法

//执行的是这个方法
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}



这个方法调用的是如下的方法:
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程,当前线程当然是线程一喽
    final Thread current = Thread.currentThread();
    //获取当前的state 那么state 是1
int c = getState();
//如果c == 0
//其实我们知道,代码之所以走到这儿就是因为前面c != 0
//但是jdk的源码为了健壮性,所以这儿还是再次判断了一下
//意思就是如果当前的state是0,那么直接加锁就可以了。
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
//如果发现上一个加锁也是自己
//那么直接进行重入加锁就可以了
    else if (current == getExclusiveOwnerThread()) {
// 1+1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //修改状态值为2,可重入加锁成功,返回true.
setState(nextc);
        return true;
    }
    return false;
}




如果上个结果返回true之后。我们再回过头来看这个方法:
public final void acquire(int arg) {
//(!true) 结果就是false
//那么代码就不继续执行了
//也就是说如果是重入加锁,那么这儿加锁成功以后就退出去了。
//换句话说,如果是重入加锁,代码执行到这儿重入加锁也就成功了!!
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

现在我们的的 线程是1,state=2
接下来我们在分析一个情况,线程2进来了。我们的代码又是如何走的。
首先state这个时候不是0了,那么直接走的是acquire方法。
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
//直接走这个方法
        acquire(1);
}

//我们再次分析这段代码
//我们分析一下这个方法。tryAcquire
//通过我们前面的分析我们知道,如果重入加锁成功了,那么这儿直接返回的是true
//但是如果发现当前的线程 和 里面加锁的线程不是同一个线程
//那么重入加锁失败。这儿就会返回来false
//如果tryAcquire返回的是false。那么  (!false) = true
//代码就会运行到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//接下来我们分析一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}



分析acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 这个方法
我们首先需要分析addWariter这个方法。注意Node.EXCLUSIVE这个值是null


//进入到这个方法的时候我们进入到了 类 AbstractQueuedSynchonzied里面。
private Node addWaiter(Node mode) {
//根据当前线程创建了一个Node
	//这个node的nextWaiter = mode = null
    Node node = new Node(Thread.currentThread(), mode);
    //tail一开始就是等于mull  
Node pred = tail;
//所以第一次进来等式不成立
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//第一次进来代码走的是这儿
    enq(node);
    return node;
}



//调的这个方法,创建来的参数的是当前线程的Node
private Node enq(final Node node) {
    for (;;) {//自旋
//第一次指针有变化
//第二次进来指针还是有变化
        Node t = tail;
//注意我们第一次进来,满足这个条件,所以t==null,是满足条件的
//第二次进来,那么这个时候t就不等于Null了,所以这儿的这个条件就不满足了
//去执行else语句
        if (t == null) { // Must initialize
//使用cas设置一个head头
            if (compareAndSetHead(new Node()))
//指针要发生变化
                tail = head;
//接着代码就会执行到这儿,大家一定要注意这儿。
//代码执行到这儿以后,因为这是一个死循环,所以
//执行到这儿以后再次运行。
        } else {
//当前的线程的prev指向t
            node.prev = t;
//使用cas操作设置队列的尾部
//这个cas的意思是当前的tail是否就是t,如果是t
//那么就把值修改为当前node,那很明显,当前的tail就是t
//所以这个就把当前node设置为tail
            if (compareAndSetTail(t, node)) {
                t.next = node;
//返回头结点结束这个死循环
                return t;
            }
        }
    }
}
到目前为止指针变化如下:
 

 

 

接下来假设线程三要进来了。
如果线程三进来,肯定就会走到这段代码。
private Node addWaiter(Node mode) {
//创建线程三Node
    Node node = new Node(Thread.currentThread(), mode);
    //pred指向tail
Node pred = tail;
//这次pred就不等于null
    if (pred != null) {
//当前的node.pred指向 tail
        node.prev = pred;
//判断当前pred是不是tail,如果是
//就把当前node设置为tail,当前tail肯定就是pred
        if (compareAndSetTail(pred, node)) {
//pred.netxt指向了线程三的node
            pred.next = node;
//返回当前node
            return node;
        }
    }
    enq(node);
    return node;
}

导致到此队列的指针变化如下:

 

 



 
到目前为止我们分析清楚了addWaiter的方法,但是不要忘记了我们的目标。我们是分析
acquireQueued的这个方法。我们只需要知道如果加锁失败了,那么就会调用addWaiter方法,addWaiter方法返回来的是当前的node
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
//需要注意一下我们这儿是死循环
        for (;;) {
//获取当前节点的上一个节点,那么我们线程三的
//上一个节点是线程二
            final Node p = node.predecessor();
//线程二不满足这儿条件
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//如果上面一个条件不满足的话,接下来就会调用如下的方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


//传进来的参数是上一个节点和当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//当一个节点刚床架你的时候 waitStatus默认值应该是0
    int ws = pred.waitStatus;
//这个条件不满足
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
//这个条件不满足
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
//那么就会直接执行这个方法。
//这个方法就会把上一个节点的waitStatus 设置为SINGAL
//也就是-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
//然后返回false
    return false;
}

 

我们再回过头来分析上一段代码:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//刚刚我们知道这儿的返回值是false
//那么如果是false的话,if条件就不满足了。
//不满足了以后再次执行for循环。
//继续执行shouldParkAfterFailedAcquire方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
当前的队列指针情况如下:

 

 




 
//再次执行这个方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取状态,现在ws=-1
    int ws = pred.waitStatus;
//符合这个提交
    if (ws == Node.SIGNAL)
//结果返回true
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
//这个shouldParkAfterFailedAcquire方法结果为true了以后
//那么接下来执行parkAndCheckInterrupt方法
//所以接下来我们分析一下parkAndCheckInterrupt方法。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
//这儿操作就比较简单了,这儿就直接把线程挂起,线程就停在这儿不动了
//必须要等另外一个线程去执行unpark操作代码才能往下执行。
    LockSupport.park(this);
    return Thread.interrupted();
}

因为代码执行到这儿就已经卡住了。所以我们回到源头看到以下,最终会让哪段代码卡住。

//这个地方是调动acquireQueued方法导致代码卡住,所以这儿的代码也会卡住不动。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}








到目前为止我们分析了一个加锁失败的线程进去到队列以后的情况。

我们现在可以解释一下公平锁和非公平锁的区别了。
我们之前的所有的代码分析的都是非公平锁的,非公平锁最大的特点就是在这儿。


final void lock() {
//加锁的时候不分青红皂白,也不管队列里面是否有人在排着队
//上来就是直接加锁,所以我们想一下,假设我们虽然现在队列里面有线程在排队加锁
//但是刚好当前的独占锁释放锁了,新进来的这个线程就加锁成功了。也就是插队成功了。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

如果是公平锁的话,加锁的时候走的是这个逻辑:
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
//调用这个方法
        acquire(1);
    }

public final void acquire(int arg) {
//首先执行的是这儿的tryAcquire方法
//其实到这儿的代码跟我们之前看到的代码是一样的。
//但是在往下实现就跟非公平锁那儿不一样了
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

 

protected final boolean tryAcquire(int acquires) {
//获取当前线程
        final Thread current = Thread.currentThread();
        int c = getState();
//如果真的当前的state=0
        if (c == 0) {
//需要这儿进行判断,我们先单独把hasQueuedPredecessors
//代码拿出来分析一下,分析后得到,这个方法是判断队列里面
//是否还有节点了。如果还有节点,那么这个方法就返回true
//!true 就是false,代码就不走这儿了。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
//这儿返回false
        return false;
    }
}


public final boolean hasQueuedPredecessors() {
 
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
//如果队列里面还有节点
//如果还有节点那么就返回来true
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

 

这样的话,我们再回过头来看这段代码:
public final void acquire(int arg) {
// tryAcquire方法返回来的是false,那么!flase的结果就是等于true
    if (!tryAcquire(arg) &&
//然后接下来就是走这个方法,那么这个方法,就是跟我们分析的一样了。
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

所以综上所述,我们的公平队列就是每次在加锁的时候,先判断队列里面是否有线程,如果有就加到队列后面,如果没有,那么就直接加锁成功。


接下来我们再分析一个场景,就是重入锁释放锁的逻辑。
public void unlock() {
    sync.release(1);
}

接下来调用的是这段代码:
public final boolean release(int arg) {
//重点调用的是这个方法
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

 

protected final boolean tryRelease(int releases) {
//加锁现在是我们的线程一进行重入锁的释放,一开始state的值2
//现在传进来的参数releases 是1
//那么c的值是1
    int c = getState() - releases;
//如果释放锁的线程不是当前独占锁的线程,那么就会报错。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
//如果把整个重入锁都释放完了,那么其实c==0
//但是第一次释放重入锁的时候,这儿c是1
    if (c == 0) {
        free = true;
//如果整个重入锁都释放了,那么就放当前的独占锁置为null
        setExclusiveOwnerThread(null);
    }
//更改线程的重入锁的个数
    setState(c);
//如果整个锁都释放完了,那么返回的是true
//如果只是释放了一部分,那么返回的是false。
    return free;
}

 

 

public final boolean release(int arg) {
//如果线程一有两个锁重入,当前只是减少了一个
//锁重入,那么tryRelease返回值是false。那么这个条件就不满足
  if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
//对两个锁重入,因为第一次进来,那么直接返回的是false。
    return false;
}

其实如果是只释放了线程一的第一个重入锁,那么这个返回值也没什么意义,我们看到的是,就是把state的值减一了。
接下来我们继续分析,如果线程一,再释放一个重入锁,也就是state由1变为0了。
我们回过头来再分析如下代码:
public final boolean release(int arg) {
//条件满足
    if (tryRelease(arg)) {
        Node h = head;
//我这儿的分析是h.waitStatus就是等于0
//但是如果这个等于0的话,我们的等于就走不下去了,可见这儿的值应该不等于零。
//这样我们就执行里面的uparkSuccessor方法
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
//获取waitStatus状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
//获取到第一个节点
    Node s = node.next;
//在目前我们虚拟的环境中,s!=null
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
      //这儿直接unpakr把线程唤醒
        LockSupport.unpark(s.thread);
}
接下来我们分析一下unpank以后代码如何走:
其实我们的代码之前卡住了,然后unpark以后会导致代码继续往下执行。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
           //之前我们的代码卡在这个地方。
           //因为现在线程被唤醒了,所以这儿就可以继续往下执行。
           //我们要注意这儿是一个死循环。
           //所以继续重复执行。
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
//重复执行到这儿,获取当前节点的上一个节点
//当前节点的上一个节点当然是head了
            final Node p = node.predecessor();
//p==head条件满足,所以接着就执行tryAcquire
//这个方法就 开始加锁了,修改当前加锁线程的名字
//把state 改为了1
            if (p == head && tryAcquire(arg)) {
//然后把当前线程的node设置为head
                setHead(node);
                //把当前线程的Node 置为null进行垃圾回收
                p.next = null; // help GC
                failed = false;
               //返回状态
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 

© 著作权归作者所有

lixiyuan
粉丝 55
博文 13
码字总数 21543
作品 0
昌平
程序员
私信 提问
加载中

评论(0)

如何将JDK源码导入到eclipse 或 Intellij idea呢?

主要是想将JDK中涉及到多线程的源码 导入到Intellij idea; 便于进行源码的注释、跟踪、以及画UML图; 主要步骤如下: 1、下载JDK源码 方式一:直接从下面地址中下载(本人就是采用这种方式) h...

故新
2017/10/12
0
0
JDK1.8 不一样的HashMap

前言 HashMap想必大家都很熟悉,JDK1.8 的 HashMap 随便一搜都是一大片一大片的,那为什么还要写呢,我会把它精简一下,一方面有利于自己的学习,另一方面希望让大家更好理解核心内容。本篇主...

小坏怎么被用了
2018/01/05
0
0
Android Studio 关联 JDK Java 源码

Android Studio 关联 Android 源码比较方便,一般下载后可自动关联,但是 Android Studio 默认使用的 JDK 是内嵌的,是不带源码的。所以在查看 JDK 源码时,看到的是反编译 class 而成的代码...

JohnnyShieh
2017/10/11
0
0
Spring如何实现AOP,请不要再说cglib了!

从注解入手找到对应核心类 最近工作中我都是基于注解实现AOP功能,常用的开启AOP的注解是@EnableAspectJAutoProxy,我们就从它入手。 上面的动图的流程的步骤就是: @EnableAspectJAutoProx...

温安适
2019/10/20
2.6K
11
动态代理-JDK动态代理的内部实现

0.一个简单的JDK动态代理的例子 创建代理类接口 创建接口的实现 实现一个InvocationHandler 使用动态代理执行被代理类的方法 查看输出结果 1.JDK动态代理的源码实现 我们从上面的代码可以看出...

似是而非Sage
2016/08/23
152
1

没有更多内容

加载失败,请刷新页面

加载更多

仁怀哪里可以开餐饮费发票-中国新闻网

仁怀哪里可以开餐饮费发票【139 * 7⒏ б2 * 15 З9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridg...

17054723687
43分钟前
44
0
资兴哪里可以开餐饮费发票-中国新闻网

资兴哪里可以开餐饮费发票【139 * 7⒏ б2 * 15 З9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridg...

17054723480
今天
32
0
沅江哪里可以开餐饮费发票-中国新闻网

沅江哪里可以开餐饮费发票【139 * 7⒏ б2 * 15 З9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridg...

17054723685
今天
36
0
天津哪里可以开餐饮费发票-中国新闻网

天津哪里可以开餐饮费发票【139 * 7⒏ б2 * 15 З9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridg...

17054723617
今天
33
0
汨罗哪里可以开餐饮费发票-中国新闻网

汨罗哪里可以开餐饮费发票【139 * 7⒏ б2 * 15 З9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridg...

17060820048
今天
38
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部