文档章节

JUC锁框架——AQS源码分析

长头发-dawn
 长头发-dawn
发布于 08/22 06:09
字数 4315
阅读 11
收藏 1

JUC锁介绍

Java的并发框架JUC(java.util.concurrent)中锁是最重要的一个工具。因为锁,才能实现正确的并发访问。而AbstractQueuedSynchronizer(AQS)是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。AQS基于模板方法模式为使用提供了底层支撑,定义了同步状态的获取和释放的方法,一般主是是以继承的方式使用,使用者可以自由组装。在java的同步组件中,AQS的子类(Sync等)一般是同步组件的静态内部类,即通过组合的方式使用。

AbstractQueuedSynchronizer 属性介绍

/*AQS维护了一CLH队列,并记录队列的头节点和尾节点*/
private transient volatile Node head;
private transient volatile Node tail;

/*
当前锁的状态,state等于0表示锁可用,state大于0表示锁被其他线程占用,之所以说大于0,而不是等于1,是因为锁可以重入,每次重入都加上1
*/
private volatile int state;

/*
代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁if (currentThread == getExclusiveOwnerThread()) {state++}
*/
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

Node 属性介绍

static final class Node {
    //标识节点当前在共享模式下
    static final Node SHARED = new Node();

    //标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;


    /*
        这几个int常量表示当前节点的等待状态(waitStatus)
    */
    //结束状态,线程取消锁的争抢,一般在超时或被中断设置为CANCELLED状态而该状态表示的节点会被踢出队列
    static final int CANCELLED =  1;

    //其表示当前node的后继节点对应的线程需要被唤醒(unpark)。如果当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程.
    static final int SIGNAL    = -1;

    //与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    static final int CONDITION = -2;

    //与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
    static final int PROPAGATE = -3;

    //节点的等待状态,新节点的waitStatus=0
    volatile int waitStatus;
    volatile Node prev;//当前节点的前一个节点。
    volatile Node next;//当前节点的后一个节点
    volatile Thread thread;//当前节点对应的线程
    Node nextWaiter;//存储condition队列中的后继节点
}

AQS中的CLH队列

CLH(Craig, Landin, and Hagersten lock queue)是AQS中管理“等待锁”的线程的队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH是一个非阻塞的 FIFO 队列,也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

ReentrantLock.java的FairSync公平锁对AQS框架的分析

lock()

 /*
获取锁,此处的“1”是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取时,它的状态值就变成了1。由于ReentrantLock是可重入锁(锁可以被单个线程多次获取),所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。
 */
 public void lock() { sync.acquire(1); }

AQS的acquire()

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. 首先,调用tryAcquire方法,再次尝试获取锁,如果获取成功直接返回。
  2. 如果获取锁失败,构造独占式同步结点(Node.EXCLUSIVE),通过addWatiter将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方式添加);
  3. 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。

tryAcquire()

公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:

protected final boolean tryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“独占锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,
        // 则判断是否有其它线程在队列中等待(基于公平锁,则讲究先来后到,看看是否有其它线程在队列中等待半天)
        // 如果没有的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果“独占锁”的拥有者已经为“当前线程”,因为可重入,锁状态加1.
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。后面我们会说明,在尝试失败的情况下,是如何一步步获取锁的。

hasQueuedPredecessors()

hasQueuedPredecessors()在AQS中实现,判断是否有其它线程在队列中等了半天。源码如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());//首节点的下一个节点是否为当前节点
}

addWaiter()

   /**
    * 如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。
    */
    private Node addWaiter(Node mode) {
        // 用当前线程构造一个Node对象,mode表示Node类型,即表示该节点在AQS的这个队列中是独占的还是共享的。
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {//队列不为空的时候
            node.prev = pred;
            //尝试修改为节点为最新的节点,如果修改失败,意味着有并发,这个时候进入enq中的死循环,进行“自旋”的方式修改
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //进入自旋
        enq(node);
        return node;
    }

enq()

采用自旋的方式入队,自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的

    /**
     * 同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。
     * enq方法将并发添加节点的请求通过CAS变得“串行化”了。
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued()

acquireQueued()非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了。acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。如果“前继节点调用unpark()唤醒了当前线程”并且“前继节点是CLH表头”,此时就是满足p==head,也就是符合公平性原则的。否则,如果当前线程是因为“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!

   /**
    * acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态(锁)( p == head && tryAcquire(arg))
    * 原因是:
    *1.头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。
    *2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说是每个线程)都在自省的观察。
    */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //死循环检查(自旋检查)当前节点的前驱节点是否为头结点,才能获取锁
            for (;;) {
                // 获取节点的前驱节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//节点中的线程循环的检查,自己的前驱节点是否为头节点
                    //将当前节点设置为头结点,移除之前的头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 否则检查前一个节点的状态,看当前获取锁失败的线程是否要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //如果需要挂起,借助JUC包下面的LockSupport类的静态方法park挂起当前线程,直到被唤醒
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果有异常
            if (failed)
                //取消请求,将当前节点从队列中移除
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(Node, Node)

 此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着呢?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果前驱节点已经是SIGNAL状态,它执行完必定会通知后继线程,那有人按时叫你起床,那么你就可以安心休息了
        return true;
    if (ws > 0) {//ws大于0,表示前驱节点已经处于取消状态,此处表示如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
        //注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
       //进入该分支条件,前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
       //在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
       //等着前驱节点拿到锁,然后释放锁的时候叫你好了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

以下这个方法就是负责挂起线程的,这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒。

private final boolean parkAndCheckInterrupt() {
  /* park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
   * 1)被unpark();
   * 2)被interrupt()。
   * Thread.interrupted()会清除当前线程的中断标记位。
   */
  LockSupport.park(this);//阻塞当前线程
  return Thread.interrupted();//返回中断标志位
}

selfInterrupt()

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

说明:这段代码就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢? 这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。而interrupted()会清除中断状态。 正因为中断状态被清除了,所以在等到锁后需要再次调用selfInterrupt()重新产生一个中断!

unlock()

unlock()在ReentrantLock.java中实现的,源码如下:

 /*
  * 1.unlock()是解锁函数,它是通过AQS的release()函数来实现的。
  * 在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,
  * 它是设置“释放锁的状态”的参数。
  * 由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
  */
    public void unlock() {
        sync.release(1);
    }

release()

public final boolean release(int arg) {
    if (tryRelease(arg)) {//判断该线程是否已经完成释放掉资源了!
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

tryRelease()

tryRelease()在ReentrantLock.java的Sync类中实现,尝试释放当前线程的同步状态(锁).源码如下:

  protected final boolean tryRelease(int releases) {
     int c = getState() - releases;//c为释放后的同步状态
     //如果“当前线程”不是“锁的持有者”,则抛出异常(非法监视器状态异常)
     if (Thread.currentThread() != getExclusiveOwnerThread())
         throw new IllegalMonitorStateException();
     boolean free = false;
     //如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
     if (c == 0) {
         free = true;
         setExclusiveOwnerThread(null);
     }
     setState(c);
     return free;
  }

unparkSuccessor(Node)

唤醒后继节点

private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒后继节点对应的线程
}

ReentrantLock.java的NonfairSync非公平锁对AQS框架的分析

非公平锁和公平锁在获取锁的方法上,流程是一样的;它们的区别主要表现在“尝试获取锁的机制不同”。简单点说,“公平锁”在每次尝试获取锁时,都是采用公平策略(根据等待队列依次排序等待);而“非公平锁”在每次尝试获取锁时,都是采用的非公平策略(无视等待队列,直接尝试获取锁,如果锁是空闲的,即可获取状态,则获取锁)。

lock()

lock()在ReentrantLock.java的NonfairSync类中实现,它的源码如下:

final void lock() {
    if (compareAndSetState(0, 1))//判断“锁”是不是空闲状态
        setExclusiveOwnerThread(Thread.currentThread());//“当前线程”直接获取“锁”。(有机会获取锁就不进阻塞队列)
    else
        acquire(1);//调用acquire(1)获取锁
}

acquire()

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //tryAcquire和公平锁的获取机制不同
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire()

非公平锁的tryAcquire()在ReentrantLock.java的NonfairSync类中实现,源码如下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

nonfairTryAcquire()在ReentrantLock.java的Sync类中实现,源码如下:

final boolean nonfairTryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,则通过CAS函数设置“锁”的状态为acquires。
        // 同时,设置“当前线程”为锁的持有者。
        //相对于公平锁而言,公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“锁”的持有者已经是“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

基于AQS的简单示例

class Mutex implements Lock, java.io.Serializable {
    // 内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否处于占用状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 当状态为0的时候获取锁
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 释放锁,将状态设置为0
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() { return new ConditionObject(); }
    }
    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

可以看到Mutex将Lock接口均代理给了同步器的实现。

参考地址:

© 著作权归作者所有

共有 人打赏支持
长头发-dawn
粉丝 6
博文 20
码字总数 45283
作品 0
西安
【JDK1.8】JUC——ReentrantLock

一、前言 在之前的几篇中,我们回顾了锁框架中比较重要的几个类,他们为实现同步提供了基础支持,从现在开始到后面,就开始利用之前的几个类来进行各种锁的具体实现。今天来一起看下Reentra...

joemsu
08/11
0
0
JUC源码分析—AQS

1. 概述 AbstractQueuedSynchronizer,简称AQS。是java里的一个抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的、、等。 AQS支持独...

泰迪的bagwell
2017/12/09
0
0
JUC锁框架——Condition

Condition介绍 Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalA...

长头发-dawn
09/12
0
0
JUC锁框架——ReadWriteLock

ReadWriteLock简单介绍 ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。相对于互斥锁而言,ReadWriteLoc允许更高的并发...

长头发-dawn
09/17
0
0
聊聊并发(十三)—AQS框架深入分析

并发系列 聊聊并发(一)深入分析Volatile的实现原理 聊聊并发(二)Java SE1.6中的Synchronized 聊聊并发(三)Java线程池的分析和使用 聊聊并发(四)深入分析ConcurrentHashMap 聊聊并发(...

陶邦仁
2015/11/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Sql语言与MySql数据库

1. 数据库简介 1. 数据库,就是存储数据的仓库,只能通过sql语言来访问,数据库也是一个文件系统。通常,MySQL、Oracle等数据库,也被称为关系型数据库,其保存的不仅仅只是数据,还包括数据...

江左煤郎
10分钟前
0
0
IDEA 取消自动import .*

打开设置 > Editor > Code Style > Java > Scheme Default > Imports ① 将 Class count to use import with "*" 改为 99 (导入同一个包的类超过这个数值自动变为 * ) ② 将 Names count ......

乔老哥
11分钟前
1
0
PostGIS学习笔记(开篇)

PostGIS事实上算是笔者开始写博客的第一篇内容。而事实上那篇博文的内容并不丰富,笔者对PostGIS的了解仍然不多,然而17年在OSGeo课程学习时对PostGIS又有了进一步了解,并逐步发现它的强大。...

胖胖雕
11分钟前
1
0
【Centos】在nginx服务器中配置php和mysql

接上一章《【Centos】利用Vultr服务器和namesilo布网》(https://my.oschina.net/u/3776619/blog/2051986),在Centos中配置好nginx,并在iptables中开启了80端口,和为了远程mysql操作方便开...

yongh701
35分钟前
2
0
flume -- fileChannel简要分析其过程

flume之event写入FileChannel doPut(event)-->获取共享锁后[log.lockShared();]-->FlumeEventPointer ptr = log.put(transactionID, event); 此处的log.put即将transactionID及event进行后续......

-九天-
47分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部