文档章节

Java并发编程之AQS

AbeJeffrey
 AbeJeffrey
发布于 2017/03/19 11:41
字数 2457
阅读 304
收藏 3

AbstractQueuedSynchronizer是JDK1.5提供的一个基于FIFO等待队列实现,用于实现同步器的基础框架,以下简称AQS。AQS是实现 JCU包中几乎所有的有关锁、多线程并发以及线程同步器等重要组件的基石, 其核心思想是基于volatile int state这样的一个属性同时配合Unsafe工具对其原子性的操作来实现对当前锁的状态进行修改 。

状态管理

AQS使用int来表示状态,同时提供了getState()、setState()、compareAndSetState()方法来获取和修改该值。在互斥锁中它表示着线程是否已经获取了锁,0表示lock不被任何线程占有,1 已获取,大于1 重入数。

private volatile int state;
protected final int getState() {
   return state;
}
protected final void setState(int newState) {
   state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
   return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

state使用volatile来保证内存可见性,使用CAS操作compareAndSetState保证写入的原子性,从而保证线程安全性。

节点

AQS中使用node表示CLH队列的节点,源码如下:

static final class Node {
        //代表一个节点使用共享模式等待
        static final Node SHARED = new Node();
        //独占模式
        static final Node EXCLUSIVE = null;
        //标记一个节点为取消状态
        static final int CANCELLED =  1;
        //代表当前结点的后继节点需要被唤醒
        static final int SIGNAL    = -1;
        //线程(处在Condition休眠状态)在等待Condition唤醒
        static final int CONDITION = -2;
        //表示锁的下一次获取可以无条件传播,在共享模式头结点有可能处于这种状态
        static final int PROPAGATE = -3;
        //线程等待状态
        volatile int waitStatus;
        //前驱节点指针
        volatile Node prev;
        //后继节点指针
        volatile Node next;
        //节点所标记的线程
        volatile Thread thread;
        //
        Node nextWaiter;
        //是否共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        //获取前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {    // 用于创建初始头节点或共享节点
        }
        Node(Thread thread, Node mode) {     // 用于addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // 用于Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

CLH队列

AQS内部维护着一个FIFO的CLH队列,相对于原始的CLH队列锁,AQS采用的是一种变种的CLH队列锁:

> 原始CLH使用的locked自旋,而AQS的CLH使用node节点的waitStatus来控制阻塞

> 为了方便处理timeout和cancel操作,每个node维护prev指针来移除cancel的节点,并且这个node节点可以继续使用prev的状态

> head实际为空节点

     在AQS维护的CLH队列中,每个Node代表着一个需要获取锁的线程。该Node中有两个常量SHARE、EXCLUSIVE。其中SHARE代表着共享模式拥有锁,EXCLUSIVE代表着独占模式拥有锁。

入队

当线程尝试获取锁的时,如果失败,则需要将该线程加入到CLH队列,入列中的主要流程是:

创建自身节点,如果tail不空,说明当前队列非空,将node的prev指针指向tail,将新的node使用CAS操作添加到队尾,失败则进入enq进行自旋直到成功为止;如果队列为空,直接进入enq初始化链表,然后再将新的node使用CAS操作添加到队尾。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
}

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

出队

当线程释放锁后,首先判断队列, head是否为null,以及head的waitStatus是否为0。如果head为null,或head的waitStatus为0,说明队列无线程等待锁。否则,需要进行出队操作,出队的主要工作则是唤醒其后继节点。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

取消

线程因为超时或者中断涉及到取消的操作。如果某个节点被取消,则该节点将不会参与锁竞争,它会等待GC回收。取消的主要过程是将取消状态的节点node移除掉(状态设置为CANCELLED),然后将其pred节点的next指针指向node的后继节点,当然这个过程仍然会是一个CAS操作:

   private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;

        Node pred = node.prev;
        while (pred.waitStatus > 0)//向前遍历,跳过取消状态的pred节点
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;

        //如果是tail,将node的pred设置为新的尾节点(已通过while循环跳过所有CANCELLED的节点)
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {//修改pred的next指针
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

看完这段代码是否有一个问题:node的后继节点的pred指针仍然指向被取消的node,何时会被更新?

任何其他线程尝试获取锁失败时,都会加入等待队列尾部,然后调用shouldParkAfterFailedAcquire准备挂起。如果当前节点的前驱节点状态为取消,则会执行如下逻辑:

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

根据prev指针向前遍历,跳过被取消的前驱节点,同时,会调整其遍历过的prev指针。遍历结束条件:某个前驱节点的状态不是取消。

此外,cancelAcquire方法中也会做同样的事,如下逻辑:

Node pred = node.prev;
while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;

从cancelAcquire方法的调用时机可知,它位于自旋操作的finally代码块中,并且只有在获取锁失败的时候才会执行。通常一个线程进入自旋之后,有3种情况从自旋操作中退出:

  1. 线程成功获取锁,此时不满足cancelAcquire方法执行条件;
  2. 线程等待锁具有超时时限,超过等待时间仍然没有获取锁,退出;
  3. 线程等待锁期间支持响应中断,由于中断导致线程退出。

也就是说,只有第2,3种情况,cancelAcquire方法才会被调用,同时只有当前节点的前驱节点也处于取消状态,才会向前遍历并调整prev指针。

总结,cancelAcquire方法将当前节点设置为取消状态后,将其pred节点的next指针指向当前节点的后继节点。但是,其后继节点的pred指针仍然指向当前节点,也就是说,即使发生GC,当前节点仍然不能被释放。

挂起

AQS的CLH队列相比原始的CLH队列锁,它采用了一种变形操作,将自旋机制改为阻塞机制。当线程进入 acquireQueued 后,首先检测prev是否指向头结点,是则尝试获取锁,成功获取锁则直接返回;失败或prev不指向头节点, 将prev节点的waitStatus值为设为SIGNAL,再一次循环尝试获取锁,如果仍然失败,此时prev节点的waitStatus已经是SIGNAL ,将线程安全挂起。

获取锁

AQS主要包括以下获取锁的方法:

acquire(int arg):以独占模式获取对象,忽略中断。

acquireInterruptibly(int arg): 以独占模式获取对象,如果被中断则中止。

acquireShared(int arg): 以共享模式获取对象,忽略中断。

acquireSharedInterruptibly(int arg)以共享模式获取对象,如果被中断则中止。

tryAcquire(int arg):试图在独占模式下获取对象状态。

tryAcquireNanos(int arg, long nanosTimeout):试图以独占模式获取对象,如果被中断则中止,如果到了给定超时时间,则会失败。

tryAcquireShared(int arg):试图在共享模式下获取对象状态。

tryAcquireSharedNanos(int arg, long nanosTimeout):试图以共享模式获取对象,如果被中断则中止,如果到了给定超时时间,则会失败。

实现AQS的同步器中,如ReentrantLock的lock()最终调用AQS的acquire方法,Semaphore的acquire()最终会调用AQS的acquireSharedInterruptibly()方法,ReadLock的lock则调用acquireShared。acquire源码如下:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }

由acquire源码可知,加锁操作首先调用tryAcquire尝试获取锁,获取成功则设置锁状态并返回true,否则返回false;若返回false,调用addWaiter将当前线程加入到CLH队列队尾。然后调用acquireQueued,根据 FIFO原则进行阻塞等待,直到获取锁为止;同时acquireQueued会检查中断状态,若发生中断,调用selfInterrupt发出中断信号。主要流程如下:

释放锁

释放锁的方法主要有:

release(int arg):以独占模式释放对象。

releaseShared(int arg): 以共享模式释放对象

tryRelease(int arg):试图设置状态来反映独占模式下的一个释放。

tryReleaseShared(int arg):试图设置状态来反映共享模式下的一个释放。

release源码如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

release首先尝试释放锁,若head不为空,且waitStatus不为0,调用unparkSuccessor唤醒next节点。

主要流程如下:

阻塞与唤醒

当节点尝试获取锁失败时,将在acquireQueued()中调用parkAndCheckInterrupt()来挂起当前线程,实际则调用LockSupport.park()挂起线程。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

当节点释放锁后,需要唤醒继任节点。在release方法中调用unparkSuccessor()来唤醒该线程的继任节点,实际通过LockSupport.unpark()来唤醒。

   private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/862143

© 著作权归作者所有

AbeJeffrey
粉丝 43
博文 43
码字总数 116095
作品 0
杭州
高级程序员
私信 提问
Java 并发编程源码解析汇总篇

java并发编程,内存模型 java并发编程,volatile内存实现和原理 Java并发编程,并发基础 Java 并发编程,线程池(ThreadPoolExecutor)源码解析 Java并发编程,Executor 框架介绍 Java并发编...

郑加威
2018/12/23
0
0
CountDownLatch源码解析

CountDownLatch 相比ReentranceLock,CountDownLatch的流程还是相对比较简单的,CountDownLatch也是基于AQS,它是AQS的共享功能的一个实现。 下面从源代码的实现上详解CountDownLatch。 1、C...

maxam0128
2018/01/23
0
0
【Java并发专题】27篇文章详细总结Java并发基础知识

努力的意义,就是,在以后的日子里,放眼望去全是自己喜欢的人和事! github:https://github.com/CL0610/Java-concurrency,欢迎题issue和Pull request。所有的文档都是自己亲自码的,如果觉...

你听___
2018/05/06
0
0
「原创」Java并发编程系列01 开篇获奖感言

  全网都是复制粘贴的文章,师长这里一直坚持输出原创   点击上方“java进阶架构师”,选择右上角“置顶公众号   不要错过每一天的原创!      为什么要学并发编程   我曾听一个...

java进阶架构师
09/28
0
0
Java 并发编程-不懂原理多吃亏(送书福利)

作者 加多 关注阿里巴巴云原生公众号,后台回复关键字“并发”,即可参与送书抽奖!** 导读:并发编程与 Java 中其他知识点相比较而言学习门槛较高,从而导致很多人望而却步。但无论是职场面...

阿里巴巴云原生
08/30
98
0

没有更多内容

加载失败,请刷新页面

加载更多

java通过ServerSocket与Socket实现通信

首先说一下ServerSocket与Socket. 1.ServerSocket ServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态. ServetSocket有三个构造方法: (1) ServerSocket(int port);...

Blueeeeeee
今天
6
0
用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
5
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
16
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
18
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部