文档章节

MPSC lock free queue

lost_o0
 lost_o0
发布于 2015/03/02 10:33
字数 579
阅读 588
收藏 2
点赞 0
评论 0

[c实现的队列](http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue)

下面是akka实现的一个MPSC队列。

PS: 代码中注释对链头链尾判定的标准是添加的元素所在的位置为链尾,这和代码中的命名相冲突了

PPS: single customer 就不太需要考虑消费者的同时取的竞争状态

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.dispatch;

import akka.util.Unsafe;

import java.util.concurrent.atomic.AtomicReference;

/**
 * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
 * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
 */
@SuppressWarnings("serial")
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
    // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
    //AtomicReference的value对应队列的链尾Node
    @SuppressWarnings("unused")
    private volatile Node<T> _tailDoNotCallMeDirectly;

    protected AbstractNodeQueue() {
       final Node<T> n = new Node<T>();
       _tailDoNotCallMeDirectly = n;//初始化根节点,value=null,链头!
       set(n);//初始化链尾部Node
    }

    /*
     * !!! There is a copy of this code in pollNode() !!!
     */
    @SuppressWarnings("unchecked")
    protected final Node<T> peekNode() {//链头
        for(;;) {
          final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
          final Node<T> next = tail.next();
          if (next != null || get() == tail)//next!=null表明它不是根节点且在链表中;get()==tail 表明整个链表中还没有Node,只能返回根节点了
            return next;
        }
    }

    public final T peek() {
        final Node<T> n = peekNode();
        return (n != null) ? n.value : null;
    }

    public final void add(final T value) {
        final Node<T> n = new Node<T>(value);
        getAndSet(n).setNext(n);//蛮经典的一句,将AtomicReference的value设置为最新的Node,并将n链接到链表尾部上去
    }
    
    public final void addNode(final Node<T> n) {
        n.setNext(null);
        getAndSet(n).setNext(n);
    }

    public final boolean isEmpty() {
        return peek() == null;
    }

    public final int count() {//count 是不准确的
        int count = 0;
        for(Node<T> n = peekNode();n != null; n = n.next())
          ++count;
        return count;
    }

    /*
     * !!! There is a copy of this code in pollNode() !!!
     */
    public final T poll() {
        final Node<T> next = peekNode();
        if (next == null) return null;
        else {
            final T ret = next.value;
            next.value = null;
            Unsafe.instance.putOrderedObject(this, tailOffset, next);//将链头替换掉
            return ret;
        }
    }
    
    @SuppressWarnings("unchecked")
    public final Node<T> pollNode() {
      Node<T> tail;
      Node<T> next;
      for(;;) {
        tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
        next = tail.next();
        if (next != null || get() == tail)
          break;
      }
      if (next == null) return null;
      else {
        tail.value = next.value;
        next.value = null;
        Unsafe.instance.putOrderedObject(this, tailOffset, next);
        return tail;
      }
    }

    private final static long tailOffset;

    static {
        try {
          tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly"));
        } catch(Throwable t){
            throw new ExceptionInInitializerError(t);
        }
    }

    public static class Node<T> {
        public T value;
        @SuppressWarnings("unused")
        private volatile Node<T> _nextDoNotCallMeDirectly;//下一个节点。next,直接用Unsafe来进行操作

        public Node() {
            this(null);
        }

        public Node(final T value) {
            this.value = value;
        }

        @SuppressWarnings("unchecked")
        public final Node<T> next() {
            return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);
        }

        protected final void setNext(final Node<T> newNext) {
          Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
        }
        
        private final static long nextOffset;
        
        static {
            try {
                nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
            } catch(Throwable t){
                throw new ExceptionInInitializerError(t);
            } 
        }
    } 
}


© 著作权归作者所有

共有 人打赏支持
lost_o0
粉丝 10
博文 36
码字总数 31139
作品 0
烟台
一个高性能无锁非阻塞链表队列

这个是一个用c++ 11标准实现的无锁非阻塞链表队列,通过增加一个dummy节点,解偶合链表头指针和尾指针。使得当只有一个生产者和一个消费者时,进队和出队都无需加锁,进队操作的是尾指针,出...

做个有思想的程序员 ⋅ 2016/05/06 ⋅ 2

Linux平台上C语言实现异步队列的两种方法

Linux上目前有两种事件通知方式,一种是线程条件变量,一种是利用eventfd实现事件通知,下面介绍一下利用这两种方法实现异步队列的方法。 线程条件变量 相关函数介绍 pthreadcondinit:初始化...

frank21 ⋅ 2014/02/27 ⋅ 0

mingw无法编译多线程程序的问题及解决

在linux环境下,用gedit+gcc方式学习一段时间的c语言,发现自己越来越佩服linux的牛人了。 居然不用IDE也能开发代码。我做不到,所以有回到了windows的怀抱。 mingw是windows版本的gnu c/c++...

mfcai ⋅ 2013/03/04 ⋅ 2

golang思考之运行速度之channel

golang channel测速程序: C语言有锁并发队列: C语言无锁并发队列: TCP: 速度: 测试暂时完成了,留下了很多问题: 1、为什么golang的channel比C语言快?如果C语言在内核态实现并发队列呢...

学习吧 ⋅ 2015/10/20 ⋅ 0

free buffer waits等待事件

Problem This will happen if there is no free buffer in the buffer cache for the session to read a new block into. All buffer gets have been suspended. This will frequently occur......

科技小能手 ⋅ 2017/11/12 ⋅ 0

C++并发数据结构算法库--CDS

CDS是一个 C++ 模板库,包含 lock-free and fine-grained 算法。 包含一系列并发数据结构的实现: 顺序支持的原子操作 SMR算法 数据结构: 栈 队列: Michael & Scott lock-free 和 read/wri...

匿名 ⋅ 2011/09/14 ⋅ 0

原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析

原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析 - 一、大致介绍 二、回顾预习 2.1 构造队列 2.2 何为JCTools? 2.3 常用重要的成员属性及方法 2.4 数据结构 三、源码...

HMILYYLIMH ⋅ 03/31 ⋅ 0

davinci sd卡驱动学习笔记(五)

跟着程序的流程我们来到 if (!mmccardpresent(card) && !mmccarddead(card)) { if (mmcregistercard(card)) 来看 /* * Internal function. Register a new MMC card with the driver model.......

曾永刚 ⋅ 2011/05/14 ⋅ 0

设计实现ringbuffer无锁队列(lock free queue)

前两天跟同事交流了 无锁队列的问题,发现不少人对这个lock free的概念有些匮乏,这里就简单写个ringbuffer实现lock free的设计理念。 这里拿python举例子,ringbuffer其实就是个数组,单纯的...

rfyiamcool ⋅ 2017/06/24 ⋅ 0

常用的几种内存池技术

几乎所有应用程序中都会有内存的分配和释放,而频繁的分配和释放内存无疑会产生内存碎片,降低系统性能,尤其对性能要求较高的程序比较明显。下面介绍几种常见的内存池技术。 一 环形缓存 环...

nothingfinal ⋅ 2013/04/22 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JavaScript零基础入门——(八)JavaScript的数组

JavaScript零基础入门——(八)JavaScript的数组 欢迎大家回到我们的JavaScript零基础入门,上一节课我们讲了有关JavaScript正则表达式的相关知识点,便于大家更好的对字符串进行处理。这一...

JandenMa ⋅ 今天 ⋅ 0

sbt网络问题解决方案

转自:http://dblab.xmu.edu.cn/blog/maven-network-problem/ cd ~/.sbt/launchers/0.13.9unzip -q ./sbt-launch.jar 修改 vi sbt/sbt.boot.properties 增加一个oschina库地址: [reposit......

狐狸老侠 ⋅ 今天 ⋅ 0

大数据,必须掌握的10项顶级安全技术

我们看到越来越多的数据泄漏事故、勒索软件和其他类型的网络攻击,这使得安全成为一个热门话题。 去年,企业IT面临的威胁仍然处于非常高的水平,每天都会看到媒体报道大量数据泄漏事故和攻击...

p柯西 ⋅ 今天 ⋅ 0

Linux下安装配置Hadoop2.7.6

前提 安装jdk 下载 wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz 解压 配置 vim /etc/profile # 配置java环境变量 export JAVA_HOME=/opt/jdk1......

晨猫 ⋅ 今天 ⋅ 0

crontab工具介绍

crontab crontab 是一个用于设置周期性被执行的任务工具。 周期性执行的任务列表称为Cron Table crontab(选项)(参数) -e:编辑该用户的计时器设置; -l:列出该用户的计时器设置; -r:删除该...

Linux学习笔记 ⋅ 今天 ⋅ 0

深入Java多线程——Java内存模型深入(2)

5. final域的内存语义 5.1 final域的重排序规则 1.对于final域,编译器和处理器要遵守两个重排序规则: (1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用...

江左煤郎 ⋅ 今天 ⋅ 0

面试-正向代理和反向代理

面试-正向代理和反向代理 Nginx 是一个高性能的反向代理服务器,但同时也支持正向代理方式的配置。

秋日芒草 ⋅ 今天 ⋅ 0

Spring 依赖注入(DI)

1、Setter方法注入: 通过设置方法注入依赖。这种方法既简单又常用。 类中定义set()方法: public class HelloWorldOutput{ HelloWorld helloWorld; public void setHelloWorld...

霍淇滨 ⋅ 昨天 ⋅ 0

马氏距离与欧氏距离

马氏距离 马氏距离也可以定义为两个服从同一分布并且其协方差矩阵为Σ的随机变量之间的差异程度。 如果协方差矩阵为单位矩阵,那么马氏距离就简化为欧氏距离,如果协方差矩阵为对角阵,则其也...

漫步当下 ⋅ 昨天 ⋅ 0

聊聊spring cloud的RequestRateLimiterGatewayFilter

序 本文主要研究一下spring cloud的RequestRateLimiterGatewayFilter GatewayAutoConfiguration @Configuration@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMi......

go4it ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部