文档章节

ConcurrentLinkedQueue源码分析

tsmyk0715
 tsmyk0715
发布于 07/16 20:17
字数 3127
阅读 214
收藏 9

相关文章

Lock锁源码分析

java阻塞队列BlockingQueue

HashMap源码分析-jdk1.6和jdk1.8的区别

LinkedList源码分析

ArrayList源码分析

前言

ConcurrentLinkedQueue是一个线程安全的队列,它采用的是 CAS 算法来进行实现,也就是说它是非阻塞的;队列中的元素按照 FIFO(先进先出)的原则对元素进行排列,此外,它是一个无界队列;添加元素的时候,在链表的尾部进行添加,获取元素的时候,从链表的头部获取。它内部采用的单向链表的形式来表示,链表的节点是定义在ConcurrentLinkedQueue的一个内部类。

类图

ConcurrentLinkedQueue 的类图如下所示:

可以看到 ConcurrentLinkedQueue 实现了 Queue 接口和实现了继承了 AbstractQueue 类,而 Itr 和 Node则是它的一个内部类;

Queue 接口只是定义了一些队列的公共方法,如下:

public interface Queue<E> extends Collection<E> {
    // 添加元素
    boolean add(E e);
    // 添加元素
    boolean offer(E e);
    // 删除元素
    E remove();
    // 删除并返回第一个元素,如果队列为空,则返回 null 
    E poll();
   // 返回第一个元素,如果不存在,则抛出NoSuchElementException异常
    E element();
    // 返回第一个元素,但不删除,如果队列为空,则返回 null 
    E peek();
}

AbstractQueue 类也继承了 Queue接口,提供了某些方法的实现,如下所示:

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
...............................
}

源码

接下来看看 ConcurrentLinkedQueue 的一个实现过程:

首先看一下队列中链表节点的定义,链表中的节点使用一个 Node 内部类来表示:

    private static class Node<E> {
        // 节点中的元素
        volatile E item;
        // 下一个节点,没有上一个节点,表示它是一个单向链表的形式
        volatile Node<E> next;
        // 构造一个节点
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        // 使用 CAS 的方式设置节点的元素
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        // 设置下一个节点
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        // 采用 CAS 的方式设置下一个节点
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
      //  Unsafe 类的一些初始化
}

可以看到 Node 类的定义比较简单,值得注意的地方是 E item 元素和 Node next 节点都使用了 volatile 来修饰,这说明了元素或某个节点被一个线程修改了之后,其他的线程是立马看到修改后的值的。

接下来看一下 ConcurrentLinkedQueue 类中的属性和方法:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

    // 头节点,
    private transient volatile Node<E> head;
    // 尾节点,尾节点不一定是链表的最后一个节点
    private transient volatile Node<E> tail;
    // 构造
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
......................
}

以上可以看到,头节点 head 和尾节点 tail 都被 volatile 修饰,节点被一个线程修改了之后,是会把修改的最新的值刷新到主内存中去,当其他线程去读取该值的时候,会中主内存中获取最新的值,也就是一个线程修改了之后,对其他线程是立即可见的。

当使用空的构造其是实例化一个对象的时候,会创建一个节点,节点的值为 null(添加的时候,是不能为null的),并把头节点和尾节点都指向该节点,如下所示:

添加元素

之后向链表中添加元素,添加元素的时候,是在链表的尾部进行添加,添加元素有两个方法 add() 和 offer(),add() 会调用 offer() 进行添加,这两个方法永远都会返回 true,所以不要使用 true | false 来判断是否添加成功;

    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        // 判空,为空则抛出空指针异常
        checkNotNull(e);
        // 创建要添加的节点
        final Node<E> newNode = new Node<E>(e);
        
        // 无限循环,入队不成功,则反复入队
        // t 表示 tail 节点
        // p 表示链表的尾节点,默认等于 tail 节点
        for (Node<E> t = tail, p = t;;) {
            // q 为尾节点的下一个节点         
            Node<E> q = p.next;
            // 如果尾节点的下一个节点为空,则表示 p 为尾节点
            if (q == null) {
                // CAS 设置尾节点的下一个节点为新添加的节点,如果设置失败,在再次尝试
                if (p.casNext(null, newNode)) {
                    // 如果tail节点有大于等于1个的 next 节点,则更新 tail 节点,将新添加的节点设置为 tail 节点
                    if (p != t) // 相当于循环两次更新一次 tail 节点
                        casTail(t, newNode);  // 新添加的节点设置为tail节点,允许失败,失败了表示有其他线程成功更新了tail节点
                    return true;
                }
            }
            else if (p == q) // 只有在尾节点和尾节点的下一个节点为空的情况下成立
                p = (t != (t = tail)) ? t : head;
            else 
                // 把 tail节点设置为为尾节点,再次循环设置下一个节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

以上方法就是向队列中添加元素的方法,该方法也是该类中比较难理解的方法;

从上面的代码可以看出,入队主要做两件事情:第一是将新添加的节点设置成当前队列尾节点的下一个节点;第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。

接下来通过图的方法来看一下元素入队的一个过程:

初始时队列如下:

1. 添加第一个节点:

for循环:

for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;

接下来进行 q==null 的判断:

            if (q == null) {
                if (p.casNext(null, newNode)) {

                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }

此时,q==null, 也就是说尾节点的下一个节点为空,此时把新添加的节点设置为尾节点的下一个节点,有因为,p节点和 t节点是相等的,所以不会更新 tail 节点;设置成功后,链表如下:

2.添加第二个节点:

            if (q == null) {
             ....
            }
            else if (p == q)
                // p = q = null
                p = (t != (t = tail)) ? t : head;
            else
                p = (p != t && t != (t = tail)) ? t : q;

此时, q 不为空,有p和q不想等,所以只会走 else 分支,p != t 为false,t != tail 也为false,所有 把 q 赋给 p ,此时队列如下:

之后,在走到for循环,此时 q == null, 所以,会把新添加的节点设置为 p 的下一个节点:

            if (q == null) {
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        casTail(t, newNode);  
                    return true;
                }
            }

又因为此时,t 节点还是指向第一个节点,p 指向第一个节点的next节点,t 和 p 不想等,所以执行 casTail(t, newNode);  新添加的节点设置为tail 节点,设置成功后,链表如下所示:

3.添加第三个元素,添加成功后,链表如下所示:

4.添加第四个节点,添加成功后,链表如下所示:

相当于在添加元素时,每循环两次才会更新一次 tail 节点。

为什么不让 tail 节点永远为队列的尾节点,如果让 tail 节点永远为队列的尾节点,则实现的代码会更少且逻辑也会更清晰,这是因为,如果让 tail 永远为队列的尾节点,则每次都需要使用循环CAS更新tail节点,如果能减少更新 tail 节点的次数,入队的性能岂不是更高?所以说并不是每次入队都需要更新尾节点,只有在tail节点和尾节点不想等的情况下才更新,这样能减少更新此时,提高效率。

获取元素

下面来看看获取元素的操作,ConcurrentLinkedQueue是一个FIFO的队列,所以获取元素的时候,总是获取到队列的第一个元素;获取元素有两个方法,poll() 和 peek(),poll()方法获取元素的时候,返回链表的第一个元素,并删除,而 peek() 方法获取元素的时候则不删除,下面看一下获取元素的主要代码逻辑:

    public E poll() {
        // 循环跳转,goto语法
        restartFromHead:
        for (;;) {
            // p 表示要出队的节点,默认为 head节点
            for (Node<E> h = head, p = h, q;;) {
                // 出队的元素
                E item = p.item;
                // 如果出队的元素不为空,则把要出队的元素设置null,不更新head节点;如果出队元素为null或者cas设置失败,则表示有其他线程已经进行修改,则需要重写获取
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // 当head元素为空,才会更新head节点,这里循环两次,更新一次head节点
                        updateHead(h, ((q = p.next) != null) ? q : p); // 更新head节点
                    return item;
                }
                // 队列为空,返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                // 把 p 的next节点赋值给p
                else
                    p = q;
            }
        }
    }

下面通过图的方式来查看获取节点的过程:

首先队列如下所示:

之后去获取节点 item0,首先进行标记:

for (Node<E> h = head, p = h, q;;) {
       E item = p.item;
    ..............................
}

标记如下:

因为此时 p.item 不为空,所示 CAS 设置 p.item 为null,走第一个if语句,又因为此时 h 节点的 item 不为空,即 p 和 h 相等,所以不会更新头节点,直接返回 p 节点中的元素 item0,如下图所示:

               if (item != null && p.casItem(item, null)) {
                    // 不会执行
                    if (p != h) // 
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }

接下来再获取 item1,进行标记,如下所示:

因为 p.item 为空,所以不满足条件  if (item != null && p.casItem(item, null)), 之后执行  else if ((q = p.next) == null),此时,q指向了 item1,

又因为 q 不为空,所有不会更新head节点,之后会执行最后一个else语句:p = q,第一次循环结束,开始第二次循环,又进行标记,此时标记如下:

因为 p.item 不为空,所以走如下代码逻辑,通过 CAS 把 p.item 设置为null,

                if (item != null && p.casItem(item, null)) {
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }

因为此时,head节点元素为空,即 p 和 h 节点不相等,所以会更新头节点,又因为 p.next 即 item2 不为空,所以把 p.next 即 item2 设置为 head节点,设置成功后,队列如下所示:

之后获取item2也是同样的逻辑,获取 items2 后队列如下:

获取 item3 后如下所示:

以上就是从队列中获取元素的主要代码逻辑,从上可知,head节点不一定就是队列的第一个含有元素的节点,也不是每次获取元素后就更新head节点,只有当head中的元素为空的时候才更新head节点,这和添加 offer() 方法中更新tail节点类似,减少 CAS 更新head节点的次数,出队的效率会更高。

isEmpty()方法

ConcurrentLinkedQueue 通过 isEmpty来判断队列是否为空,代码如下:

    public boolean isEmpty() {
        return first() == null;
    }
    Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

isEmpty 方法会判断链表的第一个元素是否为空来进行判断的。

size()方法

    public int size() {
        int count = 0;
        // succ() 获取下一个元素
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

size()方法会遍历所有的链表来查看有多少个元素。

对于在开发的时候,如果需要判断是否为空,则应该使用 isEmpty 而不应该使用 size() > 0 的方式,因为 size()会变量整个链表,效率较低。

ConcurrentLinkedQueue 类还有其他的一些方法,只有理解了入队和出队的方法 offer() 和 poll() 方法,其他方法就很好理解了。

例子

/**
 * @ Author:lenovo
 * @ Date:Created in 下午 7:22 2018/7/17 0017
 */
public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) throws InterruptedException {
        new ConcurrentLinkedQueueTest().testConcurrentLinkedQueue();
        Thread.sleep(5000L);
    }

    int num = 0;

    public ConcurrentLinkedQueue testConcurrentLinkedQueue(){
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
        for(int i = 0; i < 100; i++) {
            new Thread(() -> {
                num++;
                queue.offer(num);
            }).start();
        }
        return queue;
    }
}

 

 

© 著作权归作者所有

共有 人打赏支持
tsmyk0715
粉丝 25
博文 52
码字总数 111854
作品 0
成都
程序员
Java中的队列都有哪些,有什么区别。

Java中的队列都有哪些,实际上是问queue的实现有哪些,如:ConcurrentLinkedQueue、LinkedBlockingQueue 、ArrayBlockingQueue、LinkedList。 源于我经历过的一次生产事故,有个服务会收集业...

郑加威
02/27
1
0
JUC源码分析-集合篇(四):ConcurrentLinkedDeque

ConcurrentLinkedDeque 是双向链表结构的无界并发队列。从JDK1.7开始加入到J.U.C的行列中。使用CAS实现并发安全,与 ConcurrentLinkedQueue 的区别是该阻塞队列同时支持FIFO和FILO两种操作方...

泰迪的bagwell
01/23
0
0
【死磕Java并发】-----J.U.C之Java并发容器:ConcurrentLinkedQueue

原文出处http://cmsblogs.com/ 『chenssy』 要实现一个线程安全的队列有两种方式:阻塞和非阻塞。阻塞队列无非就是锁的应用,而非阻塞则是CAS算法的应用。下面我们就开始一个非阻塞算法的研究...

chenssy
2017/07/08
0
0
【死磕Java并发】—–J.U.C之Java并发容器:ConcurrentLinkedQueue

原文出处http://cmsblogs.com/ 『chenssy』 要实现一个线程安全的队列有两种方式:阻塞和非阻塞。阻塞队列无非就是锁的应用,而非阻塞则是CAS算法的应用。下面我们就开始一个非阻塞算法的研究...

chenssy
2017/07/08
0
0
聊聊 JDK 非阻塞队列源码(CAS实现)

正如上篇文章聊聊 JDK 阻塞队列源码(ReentrantLock实现)所说,队列在我们现实生活中队列随处可见,最经典的就是去银行办理业务,超市买东西排队等。今天楼主要讲的就是JDK中安全队列的另一...

haifeiWu
08/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

创建第一个react项目

sudo npm i -g create-react-app@1.5.2 create-react-app react-app cd react-apprm -rf package-lock.jsonrm -rf node_modules #主要是为了避免报错npm installnpm start......

lilugirl
55分钟前
1
0
在浏览器中进行深度学习:TensorFlow.js (八)生成对抗网络 (GAN)

Generative Adversarial Network 是深度学习中非常有趣的一种方法。GAN最早源自Ian Goodfellow的这篇论文。LeCun对GAN给出了极高的评价: “There are many interesting recent development...

naughty
今天
0
0
搬瓦工镜像站bwh1.net被DNS污染,国内打不开搬瓦工官网

今天下午(2018年10月17日),继搬瓦工主域名bandwagonhost.com被污染后,这个国内的镜像地址bwh1.net也被墙了。那么目前应该怎么访问搬瓦工官网呢? 消息来源:搬瓦工优惠网->搬瓦工镜像站b...

flyzy2005
今天
2
0
SpringBoot自动配置

本篇介绍下,如何通过springboot的自动配置,将公司项目内的依赖jar,不需要扫描路径,依赖jar的情况下,就能将jar内配置了@configuration注解的类,创建到IOC里面 介绍下开发环境 JDK版本1.8 spr...

贺小五
今天
3
0
命令行新建Maven多项目

参考地址 # DgroupId 可以理解为包名# DartifactId 可以理解为项目名mvn archetype:generate -DgroupId=cn.modfun -DartifactId=scaffold -DarchetypeArtifactId=maven-archetype-quickst......

阿白
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部