文档章节

j.u.c.中的AQS详解

guhanjie
 guhanjie
发布于 2017/04/20 10:28
字数 2221
阅读 56
收藏 0

AQS简介

AQS是一个实现同步器的抽象框架,本质上是通过一个状态state来表述受限资源,并通过条件队列来实现线程的阻塞、等待和唤醒等操作。

它通过对内部同步状态state(volatile int类型)进行定制编码来表述限制性资源;

提供了一组获取操作(tryAcquire(int)、tryAcquireShared(int))和释放操作(tryRelease(int)、tryReleaseShared(int))来操作有限资源state(getState()、setState(int)、compareAndSetState(int, int))实现锁的阻塞和唤醒等语义;

并通过内部类Node节点(代表具体某一线程)组织双向链表数据结构来实现线程等待队列,封装了线程阻塞、等待、唤醒等操作的具体实现;

具体的同步器只需要实现自己的同步语义即可。

AQS API

它为子类提供了以下实现方法:

上层同步器只需要调用它公有方法来实现具体的交互:

AQS的实现细节

AQS中的锁等待队列和条件队列的实现都是由内部类Node节点(代表具体某个线程)组成的双向链表来实现:

* +------+ prev +-----+ +-----+

* head | | <---- | | <---- | | tail

* +------+ +-----+ +-----+

特别需要注意:在AQS中锁等待队列和条件队列是各自独立维护的两个双向链表队列,互不影响。

资源(锁)竞争过程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

同步器类调用AQS#acquire()方法获取竞争资源,依次执行三个步骤:

Step1:调用AQS#tryAcquire()

根据自定义的state以及boolean tryAcquire()方法返回具体的资源竞争结果:

若资源未被其它线程占用,则方法直接返回,当前线程得以进入资源所保护的临界区,继续执行;

若资源已被其它线程占用,则继续执行Step2;

Step2:调用AQS#addWaiter()

将当前线程封装在AQS内部类Node节点中,然后将该线程节点放进锁等待队列(AQS#head和tail负责维护该队列)中,该队列是一个双向链表,利用CAS实现元素入队列操作。

Step3:调用AQS#acquireQueued(final Node node, int arg)

当前线程入队列后,一致等待锁资源的释放,直到获取到资源后,继续执行。该方法的命名一直让我比较困惑,深入研究代码后,我把它白话翻译一下:“等待当前节点(已入队列的)出队列”。这个方法在锁Lock的资源获取acquire()和等待队列ConditionWait的条件等待await()中都会被调用,因此比较关键,我把它贴出来:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

方法过程就是不断地判断指定节点(线程)是否排到队列头了,排到队列头说明轮到它了嘛,就返回。需要注意的点是其中interrupted变量用于记录线程的中断状态。为何要保留这个状态呢?

我们知道线程在等待资源时,可能由于资源被别人占用,导致LockSupport#park()暂停了本线程的执行,这时有可能上层业务的超时机制或者用户不耐烦了,会主动中断该线程,不想让它再等待下去了,这个是由上层运行时决定的,我们就用这个interrupted变量来记录线程的中断状态,这样即使资源被释放了,该线程能够获取该资源,根据该状态可以知道该线程被上层中断,所以就需要中断该线程,这个可以在上面的acquire()方法中看到selfInterrupt();方法就是来干这事的。

另外,这个方法内部的shouldParkAfterFailedAcquire起到了关联锁等待队列和条件等待队列的作用,它在内部检查前一个节点的等待状态再视情况而定。

资源(锁)释放过程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

与上述竞争过程对应,同步器类调用AQS#release()方法,通过tryRelease()方法修改了内部state状态,然后从队列中取出队头元素,执行unpark操作,让队头元素获取资源,并继续执行下去。

AQS 在使用类中的具体实现

java.util.concurrent包中多个同步器类的实现都是借由AQS子类来实现具体同步语义的,具体有如下类:

  • ReentrantLock
  • ReentrantReadWriteLock
  • Semaphore
  • CountDownLatch
  • FutureTask
  • SynchronousQueue

具体实现细节各有差别,但本质都是对AQS#state同步状态进行定制编码,赋予其限制资源的语义,然后通过具体的tryAcquire和tryRelease方法实现对同步状态的操作。

如果某个同步器支持独占方式的获取操作,则实现tryAcquire(int)、tryRelease(int)、isHeldExclusively()等保护方法;

如果某个同步器支持共享方式的获取操作,则实现tryAcquireShared(int)、tryReleaseShared(int)等保护方法;

在实现的这些保护方法中,通过getState()、setState(int)、compareAndSetState(int, int)等方法来检查和更新状态,并通过返回的状态值来告知基类“获取”或“释放”操作是否成功,例如tryAcquireShared返回的具体语义规范如下:

  • 返回负值,表示获取失败;
  • 返回0,表示被独占方式获取;
  • 返回正值,表示被共享方式获取;

上述具体同步器类的AQS实现思路如下:

ReentrantLock

其只支持独占方式的获取操作,因此实现tryAcquire、tryRelease和isHeldExclusively方法。同步状态state用于保存锁获取操作的次数(java中锁的粒度是线程级别,ReentrantLock的可重入性是指同一个线程对于具体某个锁是可重入的,而state就是用来记录该线程对该锁获取的重入次数,每重入一次+1,每从一个同步块退出-1),同时维护一个owner变量来保存锁的当前所有者线程的标识符,此外AQS还支持多个条件变量和多个等待线程集(由条件队列实现),Lock.newCondition返回的是一个新的ConditionObject实例,其是AQS的一个内部类。

ReentrantReadWriteLock

其利用AQS子类同时管理两个锁(读锁和写锁),AQS同步状态的高16位用于表示读取锁的计数,而低16位用于表示写入锁的计数。在读锁上使用共享方式的获取和释放操作,而在写锁上使用独占方式的获取和释放操作。此外AQS在内部还维护一个等待线程队列(注意!仅维护一个队列,读写锁共享该等待队列,这样才保证写独占时,不可读),当锁可用时,如果位于队列头部的线程执行写入操作,则该线程以独占式获取该锁;而如果队列头部的线程执行读取操作,则队列中第一个写入线程之前的所有读取线程都可以获取该锁(这就是ReentrantReadWriteLock内部Sync类的isHeldExclusively()方法的内部逻辑和实现,值得琢磨!)。

Semaphore

其支持共享方式的获取操作,因此实现tryAcquireShared等方法。同步状态用于保存当前可用许可(permits)的数量。tryAcquireShared会通过compareAndSetState以原子更新方式降低许可的计数。

CountDownLatch

使用AQS的方式与Semaphore很相似,同步状态用于保存的是当前的计数值。countDown方法调用release从而导致计数递减;await方法调用acquire,当计数值减为0时,所有等待线程依次解除阻塞。

FutureTask

FutureTask.get()的语义非常类似于闭锁的语义——当某个事件(FutureTask表示的任务执行完成或被取消)发生时,线程恢复执行,否则线程将停留在等待队列中并直到该事件发生。AQS同步状态用于保存任务的执行状态,如:正在执行、已完成或已取消。此外FutureTask还维护一些额外的状态变量,用于保存计算结果或者抛出的异常。它还维护一个指向任务具体执行线程的引用,当任务取消时,就中断该线程。(注,FutureTask在JDK1.6中的实现是上述描述,最新的JDK1.8中使用内部类WaitNode借助Unsafe和LockSupport自己实现了上述过程,不再使用AQS)

© 著作权归作者所有

上一篇: Zab vs. Paxos
guhanjie
粉丝 5
博文 3
码字总数 5114
作品 0
上海
程序员
私信 提问
【死磕Java并发】-----J.U.C之并发工具类:Semaphore

此篇博客所有源码均来自JDK 1.8 信号量Semaphore是一个控制访问多个共享资源的计数器,和CountDownLatch一样,其本质上是一个“共享锁”。 Semaphore,在API是这么介绍的: 一个计数信号量。...

chenssy
2017/05/03
0
0
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
2018/07/22
0
0
Java并发编程-看懂AQS的前世今生

  在具备了volatile、CAS和模板方法设计模式的知识之后,我们可以来深入学习下AbstractQueuedSynchronizer(AQS),本文主要想从AQS的产生背景、设计和结构、源代码实现及AQS应用这4个方面...

ling
2018/08/12
0
0
Java Concurrency(三)——J.U.C AQS源码解读

java5之后的java.util.concurrent包是世界级并发大师Doug Lea的作品,里面主要实现了 atomic包里Integer/Long对应的原子类,主要基于CAS; 一些同步子,包括Lock,CountDownLatch,Semaphore...

谦谦君子
2015/01/12
0
0
【死磕Java并发】-----J.U.C之读写锁:ReentrantReadWriteLock

此篇博客所有源码均来自JDK 1.8 重入锁ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,但是在大多数场景下,大部分时间都是提供读服务,而写服务占有的时间较少。然而读...

chenssy
2017/03/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java 判断实体类属性是否为空工具类

import org.apache.commons.lang.StringUtils;import java.lang.reflect.Field;import java.lang.reflect.Method;/** * 判断对象是否为空 * @param obj * @return */pub......

骑羊放狼灬
19分钟前
0
0
基于nginx搭建RTMP服务器

安装nginx 下载ngnix源码 git clone https://github.com/nginx/nginx.gitgit clone https://github.com/arut/nginx-rtmp-module.git 编译安装 ./configure --add-module=../nginx-rtmp-mod......

cloudjx
32分钟前
0
0
从 Spark 到 Kubernetes — MaxCompute 的云原生开源生态实践之路

2019年5月14日,喜提浙江省科学技术进步一等奖的 MaxCompute 是阿里巴巴自研的 EB 级大数据计算平台。该平台依托阿里云飞天基础架构,是阿里巴巴在10年前做飞天系统的三大件之分布式计算部分...

迷你芊宝宝
35分钟前
0
0
5个Vue.js项目的模板

开发人员查看使用SPA,Webpack,身份验证,GraphQL,文档和测试的Vue开发人员的资源。 你准备开始一个重要的Vue项目吗?为了确保从坚实的基础开始,您可以使用模板(也就是样板,骨架,起动器...

写字的男孩儿
36分钟前
0
0
epoll 的本质是什么?

从事服务端开发,少不了要接触网络编程。epoll 作为 Linux 下高性能网络服务器的必备技术至关重要,nginx、Redis、Skynet 和大部分游戏服务器都使用到这一多路复用技术。 epoll 很重要,但是...

编辑部的故事
40分钟前
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部