文档章节

Java并发编程笔记——J.U.C之collections框架:ConcurrentLinkedQueue

须臾之余
 须臾之余
发布于 07/23 22:13
字数 2672
阅读 33
收藏 0

封面立体图

一:ConcurrentLinkedQueue简介

ConcurrentLinkedQueue是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用CAS来实现线程安全。

Doug Lea在实现ConcurrentLinkedQueue时,并没有利用锁或底层同步原语,而是完全基于自旋+CAS的方式实现了该队列。回想一下AQS,AQS内部的CLH等待队列也是利用了这种方式。

由于是完全基于无锁算法实现的,所以当出现多个线程同时进行修改队列的操作(比如同时入队),很可能出现CAS修改失败的情况,那么失败的线程会进入下一次自旋,再尝试入队操作,直到成功。

所以,在并发量适中的情况下,ConcurrentLinkedQueue一般具有较好的性能。

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

二:下面是ConcurrentLinkedQueue的类图结构

三:

三:ConcurrentLinkedQueue原理

队列结构

我们来看下ConcurrentLinkedQueue的内部结构:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
 
    /**
     * 队列头指针
     */
    private transient volatile Node<E> head;
 
    /**
     * 队列尾指针.
     */
    private transient volatile Node<E> tail;
 
    // Unsafe mechanics
     
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
     
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    /**
     * 队列结点定义
     */
    private static class Node<E> {
        volatile E item;        // 元素值
        volatile Node<E> next;  // 后驱指针
 
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
 
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
 
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
 
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
 
        // Unsafe mechanics
 
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
 
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
 
    //...
}

可以看到,ConcurrentLinkedQueue内部就是一个简单的单链表结构,每入队一个元素就是插入一个Node类型的结点。

字段head指向队列头,tail指向队列尾,通过Unsafe来CAS操作字段值以及node对象的字段值。

构造器的定义

ConcurrentLinkedQueue包含两种构造器:

//构建一个空队列(head,tail均指向一个占位结点)
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
//根据以有集合构造队列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

我们不妨先看下空构造器,通过空构造器建立的ConcurrentLinkedQueue对象,其head和tail指针并非指向null,而是指向一个item值为null的node结点,如下图:

入队操作

元素的入队是在队尾插入元素,ConcurrentLinkedQueue的入队代码非常简单,却也非常精妙:

尾添加offer操作是在队列末一个元素,如果传递的参数是null则抛出NPE异常,否则由于ConcurrentLinkedQueue是无界队列,该方法会一直返回true。

另外,由于使用CAS算法,因此该方法不会阻塞挂起调用线程。下面具体看下实现原理。

public boolean add(E e) {//入队一个元素
    return offer(e);
}
//在队尾入队元素e,直到成功
public boolean offer(E e) {
    //e为null,则抛出空指针异常
    checkNotNull(e);    

    //构造node结点,在构造函数内部调用unsafe.putObject
    final Node<E> newNode = new Node<E>(e);    

    //从尾结点进行插入
    for (Node<E> t = tail, p = t;;) {   
        Node<E> q = p.next;

         //CASE1:q==null 说明p是尾结点,则直接插入
        if (q == null) {   
            //使用CAS设置p结点的next结点
            if (p.casNext(null, newNode)) {    
                //CAS成功,则说明新增结点已经放入链表,然后设置当前尾结点(包含head,第1,3,4....个结点为尾结点)
                if (p != t) // hop two nodes at a time    //CAS竞争失败的线程会在下一次自旋中进入该逻辑
                    casTail(t, newNode);  // Failure is OK.    //重新设置队尾指针tail
                return true;
            }    //CAS竞争失败则进入下一次自旋
        }
        else if (p == q)    //CASE2:发生了出队操作
            //多线程操作时,由于poll操作移除元素后,可能会把head变为自引用,也就是head的next变成了head,所以这里需要重新找新的head
            p = (t != (t = tail)) ? t : head;
        else
            //寻找尾结点
            p = (p != t && t != (t = tail)) ? t : q;    //将p重新指向队尾结点
    }
}

我们来分析下offer方法的实现。单线程的情况下,元素入队比较好理解,直接线性地在队首插入元素即可。我们假设有两个线程ThreadA和ThreadB同时进行入队地操作。

①ThreadA先单独入队两个元素9、2

此时队列地结构如下:

②ThreadA入队元素“10”,ThreadB入队元素“25”

此时ThreadA和ThreadB若并发执行,我们看下会发生什么:

1、ThreadA和ThreadB同时进入自旋中的以下代码块:

if (q == null) {
    if (p.casNext(null, newNode)) {    //CASE1:正常情况下,新结点直接插入到队尾
       //CAS竞争插入成功
        if (p != t) // hop two nodes at a time//CAS竞争失败地线程会在下一次自旋中进入该逻辑
            casTail(t, newNode);  // Failure is OK.    //重新设置队尾指针tail
        return true;
    //CAS竞争插入失败则进入下一次自旋
    }

2、ThreadA执行cas操作(p.casNext)成功,插入新结点“10”

ThreadA执行完成后,直接返回true,队列结构如下:

3、ThreadB执行cas操作(p.casNext)失败

由于CAS操作同时修改队尾元素,导致ThreadB操作失败,则ThreadB进入下一次自旋;
在下一次自旋中,进入以下代码块:

else
    // Check for tail updates after two hops.
    p = (p != t && t != (t = tail)) ? t : q;    //将p重新指向队尾结点

上述分支的作用就是让p指针重新定位到队尾结点,此时队列结构如下:

然后ThreadB会继续下一次自旋,并再次进入以下代码块:

if (q == null) {
    // p is last node
    if (p.casNext(null, newNode)) {
        // Successful CAS is the linearization point
        // for e to become an element of this queue,
        // and for newNode to become "live".
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }
    // Lost CAS race to another thread; re-read next
}

此时,CAS操作成功,队列结构如下:

由于此时p!=t ,所以会调用casTail方法重新设置队尾指针:

private boolean casTail(Node<E> cmp, Node<E> val) {    //重新设置队尾指针tail
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

这个分支只有在元素入队的同时,针对该元素也发生了“出队”操作才会执行,我们后面会分析元素的“出队”,理解了“出队”操作再回头来看这个分支就容易理解很多了。

出队操作

队列中元素的“出队”是从队首移除元素,我们来看下ConcurrentLinkedQueue是如何实现出队的:

//在队首出队元素,直到成功
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {//CASE2:队首是非哨兵结点(item!=null)
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);//CASE1:队首是一个哨兵结点(item==null)
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

还是通过示例来看,假设初始的队列结构如下:

①ThreadA先单独进行出队操作

由于head所指的是item==null的结点,所以ThreadA会执行以下分支:

else
    p = q;

然后进入下一次自旋,在自旋中执行以下分支,如果CAS操作成功,则移除首个有效元素,并重新设置头指针:

if (item != null && p.casItem(item, null)) {    //CASE2:队首是非哨兵结点
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

此时队列的结构如下:

如果ThreadA的CAS操作失败呢?

CAS操作失败则会进入以下分支,并重新开始自旋:

else if (p == q)
    continue restartFromHead;

最终前面两个null结点会被GC回收,队列结构如下:

②ThreadA继续进行出队操作

ThreadA继续执行“出队”操作,还是执行以下分支:

if (item != null && p.casItem(item, null)) {
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

但是此时p==h,所以仅将头结点置null,这其实是一种“懒删除”的策略。

出队元素“2”:

出队元素“10”:

最终队列结果如下:

③ThreadA进行出队,其它线程进行入队

这是最特殊的一种情况,当队列中只剩下一个元素时,如果同时发生出队和入队操作,会导致队列出现下面这种结构:(假设ThreadA进行出队元素“25”,ThreadB进行入队元素“11”)

此时tail.next=tail自身,所以ThreadB在执行入队时,会进入到offer方法的以下分支:

else if (p == q)    //CASE2:发生出队操作
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;

三、总结

ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。由于队列本身是一种链表结构,所以虽然算法看起来很简单,但其实需要考虑各种并发的情况,实现复杂度较高,并且ConcurrentLinkedQueue不具备实时的数据一致性,实际运用中,队列一般在生产者-消费者的场景下使用得较多,所以ConcurrentLinkedQueue的使用场景并不如阻塞队列那么多。

另外,关于ConcurrentLinkedQueue还有以下需要注意的几点:

  1. ConcurrentLinkedQueue的迭代器是弱一致性的,这在并发容器中是比较普遍的现象,主要是指在一个线程在遍历队列结点而另一个线程尝试对某个队列结点进行修改的话不会抛出ConcurrentModificationException,这也就造成在遍历某个尚未被修改的结点时,在next方法返回时可以看到该结点的修改,但在遍历后再对该结点修改时就看不到这种变化。
  2. size方法需要遍历链表,所以在并发情况下,其结果不一定是准确的,只能供参考。

参考书籍

Java并发编程之美

参考链接

https://segmentfault.com/a/1190000016248143

 

© 著作权归作者所有

须臾之余
粉丝 127
博文 75
码字总数 182934
作品 0
吉安
程序员
私信 提问
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
2018/07/22
0
0
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
747
1
Java多线程(全)学习笔记(下)

七.Callable和Future接口 C#可以把任意方法包装成线程执行体,包括那些有返回值的方法。Java也从jdk1.5开始,加入了Callable接口用来扩展Runnable接口的功能,Callable接口提供一个call()...

明舞
2014/07/27
206
0
读书笔记之《Java并发编程的艺术》-并发编程基础

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
4K
8
[Java 并发编程] 集合框架之 同步容器类 & 并发容器类

吾生也有涯,而知也无涯。———《庄子》 通过上一篇文章,我们已经知道设计一个线程安全类的原则和步骤,以及在设计过程中我们应当注意的细节。实际上,Java 的集合库包含了线程安全集合和非...

seaicelin
2018/05/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

IT兄弟连 HTML5教程 DIV+CSS网页标准化布局的优势

标准的网页都需要对内容进行布局,以前都是采用表格的定位技术,从2005年开始逐步转向DIV+CSS的布局方式,目前绝大多数的网站都是采用这种布局方式。使用DIV+CSS对网站进行布局符合W3C标准,...

老码农的一亩三分地
42分钟前
6
0
java web kafka的简单使用

一.依赖类库 kafka和springboot的版本需要对应,都使用最新版本即可. <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId>......

legend3
48分钟前
7
0
C# 如何添加自定义键盘处理事件 如何配置app.config ? | csharp key press event tutorial and app.config

本文首发于个人博客https://kezunlin.me/post/9f24ebb5/,欢迎阅读最新内容! csharp key press event tutorial and app.config Guide MainForm protected override bool ProcessCmdKey(ref......

kezunlin
今天
4
0
Fragment放到RecyclerView的item布局中

权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/zp000123/article/details/81086710 效果图: 问题 1. 动态加...

shzwork
今天
6
0
Spring 框架基础(06):Mvc架构模式简介,执行流程详解

本文源码:GitHub·点这里 || GitEE·点这里 一、SpringMvc框架简介 1、Mvc设计理念 MVC是一种软件设计典范,用一种业务逻辑、数据、界面显示分离的方法组织代码,将业务逻辑聚集到一个组件里...

知了一笑
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部