文档章节

MPSC lock free queue

lost_o0
 lost_o0
发布于 2015/03/02 10:33
字数 579
阅读 647
收藏 2

[c实现的队列](http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue)

下面是akka实现的一个MPSC队列。

PS: 代码中注释对链头链尾判定的标准是添加的元素所在的位置为链尾,这和代码中的命名相冲突了

PPS: single customer 就不太需要考虑消费者的同时取的竞争状态

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.dispatch;

import akka.util.Unsafe;

import java.util.concurrent.atomic.AtomicReference;

/**
 * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
 * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
 */
@SuppressWarnings("serial")
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
    // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
    //AtomicReference的value对应队列的链尾Node
    @SuppressWarnings("unused")
    private volatile Node<T> _tailDoNotCallMeDirectly;

    protected AbstractNodeQueue() {
       final Node<T> n = new Node<T>();
       _tailDoNotCallMeDirectly = n;//初始化根节点,value=null,链头!
       set(n);//初始化链尾部Node
    }

    /*
     * !!! There is a copy of this code in pollNode() !!!
     */
    @SuppressWarnings("unchecked")
    protected final Node<T> peekNode() {//链头
        for(;;) {
          final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
          final Node<T> next = tail.next();
          if (next != null || get() == tail)//next!=null表明它不是根节点且在链表中;get()==tail 表明整个链表中还没有Node,只能返回根节点了
            return next;
        }
    }

    public final T peek() {
        final Node<T> n = peekNode();
        return (n != null) ? n.value : null;
    }

    public final void add(final T value) {
        final Node<T> n = new Node<T>(value);
        getAndSet(n).setNext(n);//蛮经典的一句,将AtomicReference的value设置为最新的Node,并将n链接到链表尾部上去
    }
    
    public final void addNode(final Node<T> n) {
        n.setNext(null);
        getAndSet(n).setNext(n);
    }

    public final boolean isEmpty() {
        return peek() == null;
    }

    public final int count() {//count 是不准确的
        int count = 0;
        for(Node<T> n = peekNode();n != null; n = n.next())
          ++count;
        return count;
    }

    /*
     * !!! There is a copy of this code in pollNode() !!!
     */
    public final T poll() {
        final Node<T> next = peekNode();
        if (next == null) return null;
        else {
            final T ret = next.value;
            next.value = null;
            Unsafe.instance.putOrderedObject(this, tailOffset, next);//将链头替换掉
            return ret;
        }
    }
    
    @SuppressWarnings("unchecked")
    public final Node<T> pollNode() {
      Node<T> tail;
      Node<T> next;
      for(;;) {
        tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
        next = tail.next();
        if (next != null || get() == tail)
          break;
      }
      if (next == null) return null;
      else {
        tail.value = next.value;
        next.value = null;
        Unsafe.instance.putOrderedObject(this, tailOffset, next);
        return tail;
      }
    }

    private final static long tailOffset;

    static {
        try {
          tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly"));
        } catch(Throwable t){
            throw new ExceptionInInitializerError(t);
        }
    }

    public static class Node<T> {
        public T value;
        @SuppressWarnings("unused")
        private volatile Node<T> _nextDoNotCallMeDirectly;//下一个节点。next,直接用Unsafe来进行操作

        public Node() {
            this(null);
        }

        public Node(final T value) {
            this.value = value;
        }

        @SuppressWarnings("unchecked")
        public final Node<T> next() {
            return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);
        }

        protected final void setNext(final Node<T> newNext) {
          Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
        }
        
        private final static long nextOffset;
        
        static {
            try {
                nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
            } catch(Throwable t){
                throw new ExceptionInInitializerError(t);
            } 
        }
    } 
}


© 著作权归作者所有

共有 人打赏支持
lost_o0
粉丝 10
博文 37
码字总数 31139
作品 0
烟台
私信 提问
一个高性能无锁非阻塞链表队列

这个是一个用c++ 11标准实现的无锁非阻塞链表队列,通过增加一个dummy节点,解偶合链表头指针和尾指针。使得当只有一个生产者和一个消费者时,进队和出队都无需加锁,进队操作的是尾指针,出...

做个有思想的程序员
2016/05/06
2.6K
2
Linux平台上C语言实现异步队列的两种方法

Linux上目前有两种事件通知方式,一种是线程条件变量,一种是利用eventfd实现事件通知,下面介绍一下利用这两种方法实现异步队列的方法。 线程条件变量 相关函数介绍 pthreadcondinit:初始化...

frank21
2014/02/27
0
0
netty源码分析之揭开reactor线程的面纱(三)

上两篇博文(netty源码分析之揭开reactor线程的面纱(一),netty源码分析之揭开reactor线程的面纱(二)已经描述了netty的reactor线程前两个步骤所处理的工作,在这里,我们用这张图片来回顾...

闪电侠_
08/10
0
0
golang思考之运行速度之channel

golang channel测速程序: C语言有锁并发队列: C语言无锁并发队列: TCP: 速度: 测试暂时完成了,留下了很多问题: 1、为什么golang的channel比C语言快?如果C语言在内核态实现并发队列呢...

学习吧
2015/10/20
101
0
mingw无法编译多线程程序的问题及解决

在linux环境下,用gedit+gcc方式学习一段时间的c语言,发现自己越来越佩服linux的牛人了。 居然不用IDE也能开发代码。我做不到,所以有回到了windows的怀抱。 mingw是windows版本的gnu c/c++...

mfcai
2013/03/04
0
2

没有更多内容

加载失败,请刷新页面

加载更多

Ubuntu18.04 安装MySQL

1.安装MySQL sudo apt-get install mysql-server 2.配置MySQL sudo mysql_secure_installation 3.设置MySQL非root用户 设置原因:配置过程为系统root权限,在构建MySQL连接时出现错误:ERROR...

AI_SKI
今天
3
0
3.6 rc脚本(start方法) 3.7 rc脚本(stop和status方法) 3.8 rc脚本(以daemon方式启动)

3.6-3.7 rc脚本(start、stop和status方法) #!/usr/bin/env python# -*- coding: utf-8 -*-# [@Version](https://my.oschina.net/u/931210) : python 2.7# [@Time](https://my.oschina.......

隐匿的蚂蚁
今天
3
0
Cnn学习相关博客

CNN卷积神经网络原理讲解+图片识别应用(附源码) 笨方法学习CNN图像识别系列 深度学习图像识别项目(中):Keras和卷积神经网络(CNN) 卷积神经网络模型部署到移动设备 使用CNN神经网络进行...

-九天-
昨天
5
0
flutter 底部输入框 聊天输入框 Flexible

想在页面底部放个输入框,结果键盘一直遮住了,原来是布局问题 Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text("评论"), ...

大灰狼wow
昨天
4
0
Kernel I2C子系统

备注:所有图片来源于网络 1,I2C协议: 物理拓扑: I2C总线由两根信号线组成,一条是时钟信号线SCL,一条是数据信号线SDA。一条I2C总线可以接多个设备,每个设备都接入I2C总线的SCL和SDA。I...

yepanl
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部