文档章节

MPSC lock free queue

lost_o0
 lost_o0
发布于 2015/03/02 10:33
字数 579
阅读 630
收藏 2

[c实现的队列](http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue)

下面是akka实现的一个MPSC队列。

PS: 代码中注释对链头链尾判定的标准是添加的元素所在的位置为链尾,这和代码中的命名相冲突了

PPS: single customer 就不太需要考虑消费者的同时取的竞争状态

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.dispatch;

import akka.util.Unsafe;

import java.util.concurrent.atomic.AtomicReference;

/**
 * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
 * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
 */
@SuppressWarnings("serial")
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
    // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
    //AtomicReference的value对应队列的链尾Node
    @SuppressWarnings("unused")
    private volatile Node<T> _tailDoNotCallMeDirectly;

    protected AbstractNodeQueue() {
       final Node<T> n = new Node<T>();
       _tailDoNotCallMeDirectly = n;//初始化根节点,value=null,链头!
       set(n);//初始化链尾部Node
    }

    /*
     * !!! There is a copy of this code in pollNode() !!!
     */
    @SuppressWarnings("unchecked")
    protected final Node<T> peekNode() {//链头
        for(;;) {
          final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
          final Node<T> next = tail.next();
          if (next != null || get() == tail)//next!=null表明它不是根节点且在链表中;get()==tail 表明整个链表中还没有Node,只能返回根节点了
            return next;
        }
    }

    public final T peek() {
        final Node<T> n = peekNode();
        return (n != null) ? n.value : null;
    }

    public final void add(final T value) {
        final Node<T> n = new Node<T>(value);
        getAndSet(n).setNext(n);//蛮经典的一句,将AtomicReference的value设置为最新的Node,并将n链接到链表尾部上去
    }
    
    public final void addNode(final Node<T> n) {
        n.setNext(null);
        getAndSet(n).setNext(n);
    }

    public final boolean isEmpty() {
        return peek() == null;
    }

    public final int count() {//count 是不准确的
        int count = 0;
        for(Node<T> n = peekNode();n != null; n = n.next())
          ++count;
        return count;
    }

    /*
     * !!! There is a copy of this code in pollNode() !!!
     */
    public final T poll() {
        final Node<T> next = peekNode();
        if (next == null) return null;
        else {
            final T ret = next.value;
            next.value = null;
            Unsafe.instance.putOrderedObject(this, tailOffset, next);//将链头替换掉
            return ret;
        }
    }
    
    @SuppressWarnings("unchecked")
    public final Node<T> pollNode() {
      Node<T> tail;
      Node<T> next;
      for(;;) {
        tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
        next = tail.next();
        if (next != null || get() == tail)
          break;
      }
      if (next == null) return null;
      else {
        tail.value = next.value;
        next.value = null;
        Unsafe.instance.putOrderedObject(this, tailOffset, next);
        return tail;
      }
    }

    private final static long tailOffset;

    static {
        try {
          tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly"));
        } catch(Throwable t){
            throw new ExceptionInInitializerError(t);
        }
    }

    public static class Node<T> {
        public T value;
        @SuppressWarnings("unused")
        private volatile Node<T> _nextDoNotCallMeDirectly;//下一个节点。next,直接用Unsafe来进行操作

        public Node() {
            this(null);
        }

        public Node(final T value) {
            this.value = value;
        }

        @SuppressWarnings("unchecked")
        public final Node<T> next() {
            return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);
        }

        protected final void setNext(final Node<T> newNext) {
          Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
        }
        
        private final static long nextOffset;
        
        static {
            try {
                nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
            } catch(Throwable t){
                throw new ExceptionInInitializerError(t);
            } 
        }
    } 
}


© 著作权归作者所有

共有 人打赏支持
lost_o0
粉丝 10
博文 37
码字总数 31139
作品 0
烟台
一个高性能无锁非阻塞链表队列

这个是一个用c++ 11标准实现的无锁非阻塞链表队列,通过增加一个dummy节点,解偶合链表头指针和尾指针。使得当只有一个生产者和一个消费者时,进队和出队都无需加锁,进队操作的是尾指针,出...

做个有思想的程序员
2016/05/06
2.6K
2
netty源码分析之揭开reactor线程的面纱(三)

上两篇博文(netty源码分析之揭开reactor线程的面纱(一),netty源码分析之揭开reactor线程的面纱(二)已经描述了netty的reactor线程前两个步骤所处理的工作,在这里,我们用这张图片来回顾...

闪电侠_
08/10
0
0
Linux平台上C语言实现异步队列的两种方法

Linux上目前有两种事件通知方式,一种是线程条件变量,一种是利用eventfd实现事件通知,下面介绍一下利用这两种方法实现异步队列的方法。 线程条件变量 相关函数介绍 pthreadcondinit:初始化...

frank21
2014/02/27
0
0
golang思考之运行速度之channel

golang channel测速程序: C语言有锁并发队列: C语言无锁并发队列: TCP: 速度: 测试暂时完成了,留下了很多问题: 1、为什么golang的channel比C语言快?如果C语言在内核态实现并发队列呢...

学习吧
2015/10/20
101
0
mingw无法编译多线程程序的问题及解决

在linux环境下,用gedit+gcc方式学习一段时间的c语言,发现自己越来越佩服linux的牛人了。 居然不用IDE也能开发代码。我做不到,所以有回到了windows的怀抱。 mingw是windows版本的gnu c/c++...

mfcai
2013/03/04
0
2

没有更多内容

加载失败,请刷新页面

加载更多

smart-doc特殊功能使用介绍

smart-doc从8月份底开始开源发布到目前为止已经迭代了几个版本。在这里非常感谢那些敢于用smart-doc去做尝试并积极提出建议的社区用户。因此决定在本博客中重要说明下smart-doc的功能,包括使...

上官胡闹
昨天
0
0
JavaEE——Junit

声明:本栏目所使用的素材都是凯哥学堂VIP学员所写,学员有权匿名,对文章有最终解释权;凯哥学堂旨在促进VIP学员互相学习的基础上公开笔记。 Junit Junit又名单元测试,Junit是用来测试Jav...

凯哥学堂
昨天
0
0
读《美丽新世界》

一、背景 十一国庆节从重庆回深圳的时候,做得绿皮车,路上看了两本书:李笑来的《韭菜的自我修养》和禁书《美丽新世界》。 上篇文章已经分享了 读《韭菜的自我修养》,这篇文章来记录一下《...

tiankonguse
昨天
0
0
archlinux下基于Jenkins,docker实现自动化部署(持续交互)

本文永久更新地址:https://my.oschina.net/bysu/blog/2250954 【若要到岸,请摇船:开源中国 不最醉不龟归】 -----------------------------------第一部分Jenkins的安装与使用-----------...

不最醉不龟归
昨天
0
0
Spark Parquet file split

在实际使用 spark + parquet 的时候, 遇到了两个不解的地方: 我们只有一个 parquet 文件(小于 hdfs block size), 但是 spark 在某个 stage 生成了4个 tasks 来处理. 4个 tasks 中只有一个 ta...

Aaaaaaaron
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部