文档章节

Java并发框架Disruptor实现原理与源码分析(三) RingBuffer原理模型与源码分析

Norman.Dai
 Norman.Dai
发布于 2017/01/07 23:16
字数 3050
阅读 319
收藏 0

RingBuffer实现原理

我们在前面第一章节中就已经讲过RingBuffer是一个环形队列。但是今天我们要改变这样的看法,因为说它是环形队列只是找一种和它更像的数据结构去描述它。事实上它不是一个队列,因为它不具备队列的特性,比如FIFO等。RingBuffer就是一个数组,没有别的,之所以说它是环形队列是因为它通过算法维持了一个类似环形队列的数据结构。

图 1-1

图 1-1 (画图对于程序员来讲真的不是一件容易的事) 是我们对RingBuffer的一个抽象描述。比如我们现在假定我们有一个8个槽的RingBuffer,那事实上只是一个8个长度的数组。然后我们有一个从0一直递增的序列号,我们之前提到过。那么怎么样才能模拟一个环形队列?也就是通过序列号(sequence)来映射到给定大小的数组的元素上面去,如果你看一下图 1-2 你或许一下子就能明白他们之间的关系。从数学模型的角度讲其实是一个很简单的等式,index = sequence - ( arrayLenght * (round - 1 ) ),其中round是记录圈数的。但是用于计算机处理我们有一个更高效的处理方式 index = sequence & (array length-1)。我们下面通过代码来分析一下,此部分内容我们通过代码注释来描述。

图 1-2


    
    /*
    	RingBufferPad 作为RingBufferFields的超类他的作用只是通过7个long变量去填充缓存行
    	RingBufferFields 是RingBuffer的超类,也是其中很重要的组成部分,它里面维护了一个Event对象的环形数组
    */
    abstract class RingBufferPad
    {
    protected long p1, p2, p3, p4, p5, p6, p7;
    }
    
    abstract class RingBufferFields<E> extends RingBufferPad
    {
    private static final int BUFFER_PAD;
    private static final long REF_ARRAY_BASE;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();
    
    static
    {
    		/*
    			UNSAFE.arrayIndexScale 是获取一个数组在内存中的scale,也就是每个数组元素在内存中的大小
    			因为我们的event是任意一个对象,所以在这里用一个Object的数组class来求scale
    		*/
    final int scale = UNSAFE.arrayIndexScale(Object[].class);
    		//不同的JVM设置,它的指针大小是不一样的
    if (4 == scale)
    {
    REF_ELEMENT_SHIFT = 2;
    }
    else if (8 == scale)
    {
    REF_ELEMENT_SHIFT = 3;
    }
    else
    {
    throw new IllegalStateException("Unknown pointer size");
    }
    BUFFER_PAD = 128 / scale;
    // 获取数组在内存中的偏移量,也就是第一个元素的内存偏移量
    REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }
    
    private final long indexMask;
    private final Object[] entries;//申明一个对象数组
    protected final int bufferSize;
    protected final Sequencer sequencer;
    
    RingBufferFields(
    EventFactory<E> eventFactory,
    Sequencer sequencer)
    {
    this.sequencer = sequencer;
    this.bufferSize = sequencer.getBufferSize();
    
    if (bufferSize < 1)
    {
    throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
    throw new IllegalArgumentException("bufferSize must be a power of 2");
    }
    
    this.indexMask = bufferSize - 1;//indexMask 就是数组的最大下标
    		//可以看出在创建数组的时候预留了两个单位的缓存填充
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//这个地方我认为有BUG存在,请大神解答
    		/*
    			一次性填充慢整个数组,这样做又是一个比较有技巧的做法
    			他通过填充慢数组,在运行时改变对象的值来达到防止Java垃圾回收(GC)产生的系统开销
    			换句话说就是它不需要垃圾回收
    		*/
    fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory)
    {
    for (int i = 0; i < bufferSize; i++)
    {
    entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
    }
    	
    	/*
    		获取指定序列号对应的数组元素,也就是event
    	*/
    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence)
    {	
    		//这个地方就是我们讲的通过递增序列号获取与序列号对应的数组元素
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
    }

原理技巧总结

通过上面的分析,我们可以看出在实现RingBufferFields这个对象的时候有很多非常取巧的地方*(取巧不一定是贬义,也是创新的表现,最恨那句投机取巧,扼杀了多少孩子的创新*)。

  1. 缓存行填充:这个前面已经讲过
  2. 位操作:这个虽然不是取巧的方法,但是加快了系统的计算速度,通过递增系列获取元素下标模拟一个环的算法真的很有技巧
  3. 数组预填充:这个又是一个很取巧的方法,避免了垃圾回收代来的系统开销

最后一个就是使用数组+系列号的这种方法最大限度的提高了速度。因为如果使用传统的队列的话在多线程环境下对队列头和队列尾的锁竞争是一种很大的系统开销*(这个后面讲)*

RingBuffer 概览##

RingBuffer 官方的说法是保存事件生产者与事件操作者之间的数据载体,下面是官方原版

Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.

其次RingBuffer还实现了三个接口,分别是 Cursored、EventSequencer 和 EventSink,以及继承了 RingBufferFields,也就是说在RingBuffer中可以直接操作事件对象数组。 图1-4 是RingBuffer中方法的一部分,从它的方法我们可以看出RingBuffer主要的作用是用于发布事件和创建生产者,至于它是如何发布事件以及怎么样处理事件我们会在后面详细阐述。

图 1- 4

Sequence相关实现

为了更好的去观察整个RingBuffer的时间发布与事件处理机制我们先需要分析一下和时间发布密切相关的sequence 的一些类和方法。

###Sequence Sequence在RingBuffer中是一个很重要的类,它就是指我们用来发布事件的递增系列,只不过为了更好的使用Disruptor对其做了进一步的封装。下面我们看一下它的代码*(老规矩代码解释在代码中进行)*

  class LhsPadding
   {
   protected long p1, p2, p3, p4, p5, p6, p7;
   }
   
   class Value extends LhsPadding
   {
   protected volatile long value;//volatile 语义
   }
   
   class RhsPadding extends Value
   {
   protected long p9, p10, p11, p12, p13, p14, p15;
   }
   
   /**
    * Concurrent sequence class used for tracking the progress of
    * the ring buffer and event processors.  Support a number
    * of concurrent operations including CAS and order writes.
    *
    * Also attempts to be more efficient with regards to false
    * sharing by adding padding around the volatile field.
   上面这段是官方解释,意思就是说为事件的并发操作提供一个原子的数字,同时通过缓存行填充来解决假共享来提高运行效率
    
    */
   public class Sequence extends RhsPadding
   {
   static final long INITIAL_VALUE = -1L;
   private static final Unsafe UNSAFE;
   private static final long VALUE_OFFSET;
   
   static
   {
   UNSAFE = Util.getUnsafe();
   try
   {	//这是一段内存操作,用来获取超类中value字段的内存偏移地址
   VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
   }
   catch (final Exception e)
   {
   throw new RuntimeException(e);
   }
   }
   
   /**
    * 这个是一个初始化方法,默认系列号是-1
    */
   public Sequence()
   {
   this(INITIAL_VALUE);
   }
   
   /**
   		初始化方法,通过unsafe的putOrderedLong方法将value立即回写到主内存中以保证其对其他线程的可见性
   		我们在前面讲过,一个volatile语义的变量在它的写之前,编译器会加一个Store/Store内存屏障
   		关于这部分内容可以查看之前的博客,或者详细了解volatile语言的实现原理
    */
   public Sequence(final long initialValue)
   {
   UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
   }
   
   /**
   	  获取当前保存的系列值
    */
   public long get()
   {
   return value;
   }
   
   /**
   		这个和前面的那个初始化方法是一致的,只不过用于在运行期进行值操作
    */
   public void set(final long value)
   {
   UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
   }
   
   /**
   		这是一个实现了Volatile语义的写方法(变量必须实现Volatile语义),和上面的方法一样,它也会在写操作前面加内存屏障
    */
   public void setVolatile(final long value)
   {
   UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
   }
   
   /**
   		这个就是我们前面讲过的CAS原子操作,用来原子的给某个变量赋值,
    */
   public boolean compareAndSet(final long expectedValue, final long newValue)
   {
   return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
   }
   
   /**
   		获取一个+1后的long值,值得注意的是这个方法是原子的,请看下面
    */
   public long incrementAndGet()
   {
   return addAndGet(1L);
   }
   
   /**
   		它是通过CAS操作来实现原子性
    */
   public long addAndGet(final long increment)
   {
   long currentValue;
   long newValue;
   		//从这里可以看出,通过一个循环来进行值的原子操作,直到操作成功
   		//循环的原因就是,当其他线程对这个值进行操作的时候会对缓存行加锁,所以这里需要不断的进行CAS操作
   		//(这个地方是系统中相对比较耗费的地方)
   do
   {
   currentValue = get();
   newValue = currentValue + increment;
   }
   while (!compareAndSet(currentValue, newValue));
   
   return newValue;
   }
   
   @Override
   public String toString()
   {
   return Long.toString(get());
   }
   }

###Sequencer 序列跟踪器

Sequencer 是一个序列跟踪与处理的接口,它继承了Cursored与Sequenced两个接口用于对序列的处理。 我们首先看Cursored这个接口,Cursored只提供了一个方法getCursor(),用于获取当前的游标值,这个游标值就是当前事件的发布序列号。Sequenced 接口是一个序列操作接口,用于支持对运行时的序列进行管理,下面图1-5是它的主要方法。

图 1-5

Sequencer有一个最直接的子类AbstractSequencer,从它的名称上就可以看出这是一个抽象类,它实现了Sequencer的一些方法,它是MultiProducerSequencer和 SingleProducerSequencer类的父类,后面我们会分析这两个类,下面我们先看AbstractSequencer这个类。


    public abstract class AbstractSequencer implements Sequencer
    {	
    	//AtomicReferenceFieldUpdater 是Java并发包中的一个原子操作类,它的newUpdater用于对volatile语义的对象进行原子更新
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
    AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    
    protected final int bufferSize;//数组的大小,也就是RingBuffer环的大小
    protected final WaitStrategy waitStrategy;//等待策略,这个后面分析
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//当前发布的事件的游标,它也是一个序列
    protected volatile Sequence[] gatingSequences = new Sequence[0];//事件序列,用于做事件控制
    
    /**
    		初始化方法,一个简单的初始化,这里就不过多赘述
     */
    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
    if (bufferSize < 1)
    {
    throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
    throw new IllegalArgumentException("bufferSize must be a power of 2");
    }
    
    this.bufferSize = bufferSize;
    this.waitStrategy = waitStrategy;
    }
    
    /**
     * 返回当前的游标
     */
    @Override
    public final long getCursor()
    {
    return cursor.get();
    }
    
    /**
     * 获取bufferSize
     */
    @Override
    public final int getBufferSize()
    {
    return bufferSize;
    }
    
    /**
     * 这个方法我们在后面分析SequenceGroups的时候讲
     */
    @Override
    public final void addGatingSequences(Sequence... gatingSequences)
    {
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }
    
    @Override
    public boolean removeGatingSequence(Sequence sequence)
    {
    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
    }
    
    /**
     * 这个方法用于返回当前游标与给定序列中的最小值
    	   用于做事件的控制,比如当前处理速度最慢的游标
     */
    @Override
    public long getMinimumSequence()
    {
    return Util.getMinimumSequence(gatingSequences, cursor.get());
    }
    
    /**
     * SequenceBarrier 会在后面讲
     */
    @Override
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
    
    /**
    		创建一个事件轮询器
     */
    @Override
    public <T> EventPoller<T> newPoller(DataProvider<T> dataProvider, Sequence... gatingSequences)
    {
    return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences);
    }
    }

下面我们要分析的是RingBuffer中很重要的一部分,就是事件轮询机制的实现。

    //这部分的代码是事件轮询的具体实现
    	public PollState poll(final Handler<T> eventHandler) throws Exception
    {
    final long currentSequence = sequence.get();
    long nextSequence = currentSequence + 1;
    final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());
    		//从下面这段代码我们可以看出,它的作用就是防止事件发布的时候当前发布的事件将正在处理的事件覆盖掉
    		//为了防止超过正在处理的事件它会在这个位置进行等待,知道可以发布为止
    		//所以从这段代码我们得出一个结论,就是在事件处理方法中不要发生阻塞,如果阻塞它会影响整个系统运行
    		//发生阻塞会使得整个系统都阻塞在一个地方
    if (nextSequence <= availableSequence)
    {
    boolean processNextEvent;
    long processedSequence = currentSequence;
    
    try
    {
    do
    {
    final T event = dataProvider.get(nextSequence);
    processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    processedSequence = nextSequence;
    nextSequence++;
    
    }
    while (nextSequence <= availableSequence & processNextEvent);
    }
    finally
    {
    sequence.set(processedSequence);
    }
    
    return PollState.PROCESSING;
    }
    else if (sequencer.getCursor() >= nextSequence)
    {
    return PollState.GATING;
    }
    else
    {
    return PollState.IDLE;
    }
    }

输入图片说明

上面这幅图是RingBuffer的事件处理模型,也就是一个类似环的数组,从目前我们分析过的代码来看,Disruptor处理事件的时候依然有自己的缺陷,比如对事件的处理过程中不能发生阻塞,一旦发生阻塞就会使得整个处理系统都发生阻塞,系统将没有办法重新发布事件。

© 著作权归作者所有

Norman.Dai
粉丝 0
博文 5
码字总数 7384
作品 2
西安
程序员
私信 提问
牛逼哄哄的Dubbo框架,底层到底是什么原理?

搞了N年Java,仍有不少朋友困惑:用了很多年Dubbo,觉得自己挺厉害,跳槽面试时一问RPC,一问底层通讯,一问NIO和AIO,就一脸懵逼,到底该怎么办? (大家有没有这样的感触?Dubbo用得很熟,...

Java猫
03/27
0
0
并发框架Disruptor译文

Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处...

石头哥哥
2013/06/30
827
0
三流程序员与一流程序员之间的区别,看看你是属于哪一类?

源码系列 手写spring mvc框架 基于Spring JDBC手写ORM框架 实现自己的MyBatis Spring AOP实战之源码分析 Spring IOC高级特性应用分析 ORM框架底层实现原理剖析 手写Spring MVC框架实现 手把手...

茶轴的青春
2018/04/17
34
0
disruptor框架原理及使用

disruptor是LMAX出品的一个高效的无锁并发框架,它高效核心在于其无锁队列RingBuffer的独特设计。 它的特点在于: 1、队列使用数组结构实现环形队列,而不是使用传统的链表结构,寻址更快; ...

winworld
2013/12/26
376
0
BATJ等大厂最全经典面试题分享

金九银十,又到了面试求职高峰期,最近有很多网友都在求大厂面试题。正好我之前电脑里面有这方面的整理,于是就发上来分享给大家。 这些题目是网友去百度、蚂蚁金服、小米、乐视、美团、58、...

老道士
2018/09/26
158
0

没有更多内容

加载失败,请刷新页面

加载更多

浅谈java过滤器Filter

一、简介 Servlet中的过滤器Filter是实现了javax.servlet.Filter接口的服务器端程序,主要的用途是过滤字符编码、做一些业务逻辑判断如是否有权限访问页面等。其工作原理是,只要你在web.xml...

青衣霓裳
17分钟前
3
0
技术产品研发型企业的发展困境

目前中小企业想要发展自己的业务,必须使用到底层架构技术,于是企业多采用三种技术方案,一、招相关技术人员,并为之长期投入技术成本;二、使用相关开源技术框架,并培养相关业务人员;三、...

数蚕
40分钟前
3
0
《2019网络安全行业人才发展研究报告》出炉

随着大安全时代的到来,网络安全已经从虚拟空间延伸到现实空间。当今网络战愈演愈烈,网络军备赛即将来临。网络空间领域的战争归根到底还是人才的竞争。面对新形势,建立高效的网络安全人才培...

360网络安全大学
43分钟前
1
0
酒德利商城开发程序逻辑

<textarea name="Content" rows="2" cols="20" id="Content" class="t14" style="font-family:宋体;font-size:10.5pt;"></textarea> 酒德利商城开发zhao·I5OI3I5I74O微/电,酒德利批发零售,......

洋哥说
53分钟前
2
0
十款强大的IDEA插件-Java开发者的利器

xl_echo编辑整理,欢迎转载,转载请声明文章来源。欢迎添加echo微信(微信号:t2421499075)交流学习。 百战不败,依不自称常胜,百败不颓,依能奋力前行。——这才是真正的堪称强大!! 插件一...

xlecho
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部