过气网红Disruptor源码分析

原创
2020/06/21 21:31
阅读数 1.8K

Disruptor

Disruptor是java圈子里著名的并发队列,它是一个基于生产者-消费者模型,并优化了cpu伪共享的高性能队列。要理解disruptor需要理解一下几个概念:RingBufferSequenceSequencerSequenceBarrierdisruptor

RingBuffer

RingBuffer是disruptor中用来存数据的环形数组。Disruptor的基本数据结构就是一个循环队列。学习过数据结构的同学都知道循环队列是一个基于数组的队列,用一个变量来表示队头位置下标,另一个变量来表示队尾位置下标。当位置下标到达数组末尾的时候,下标的下一个位置就移动到数组开头,例如jdk中的java.util.concurrent.ArrayBlockingQueue,它用putIndex来表示队尾位置,用takeIndex来表示队头位置,当队头或队尾到达数组末尾的时候,被置为数组开头位置。下面是java.util.concurrent.ArrayBlockingQueue中的代码。

/** items index for next take, poll, peek or remove */
int takeIndex;  队头位置
/** items index for next put, offer, or add */
int putIndex;   队尾位置

private void enqueue(E x) {                 入队方法
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;                   队尾入队数据
    if (++putIndex == items.length)        如果已经到了数组的最后一个位置
        putIndex = 0;                      位置置于数组开头
    count++;
    notEmpty.signal();
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];           队头出队数据
    items[takeIndex] = null;              如果已经到了数组的最后一个位置
    if (++takeIndex == items.length)      位置置于数组开头
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}


Sequence

Sequence是disruptor对队列中表示位置的下标位置的抽象。为什么要用一个类而不是一个整型来表示数组的下标位置呢?这是因为disruptor在试图解决cpu伪共享问题。CPU伪共享简单讲就是在不同cpu核的多个线程他们分别在自己的cache中缓存了同一个变量,当一个线程修改了这个变量将会使用MESI协议将别的线程缓存了相同变量的cache-line失效。如果多个线程高频修改一个变量可能会相互影响使得cpu缓存的作用大打折扣。那怎么才能尽量避免这种情况呢?disruptor的做法是让每个消费者都维护着自己的sequence,并且sequence做了cache-line填充,使得每个sequence将占用整个cache-line。一般来说一个cache-line是64个字节,用一个long来表示位置,那么就需要8个long。因此,disruptor在表示下标的long变量前后都放置了7个long,这样当读取value时,无论从哪个方向读取64个字节都能保证cache-line被填充。下面是com.lmax.disruptor.Sequence的代码。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;            填充
}

class Value extends LhsPadding
{
    protected volatile long value;                        真正的值
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;      填充
}


Sequencer

Sequencer是用来协调生产者进度和消费者进度的。消费者不能跑到生产者前面去了,生产者也不能超过消费者一圈。AbstractSequencer有3个重要的参数,cursor表示的生产者的位置,gatingSequences表示的是末端消费者的位置,waitstrategy表示当没有数据给消费者时,消费者的等待行为。下面是com.lmax.disruptor.AbstractSequencer的代码。

protected final WaitStrategy waitStrategy;                                           等待策略
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);      生产者位置
protected volatile Sequence[] gatingSequences = new Sequence[0];                     消费者位置

生产者在生产数据的时候有2个步骤,第一步,获取新数据的位置;第二步,插入数据并发布,发布后的数据就可以被消费了。其中第一步对应着next方法,第二步对应着publish方法。依据生产者是单线程的还是多线程的,Sequencer被分为MultiProducerSequencer和SingleProducerSequencer,这2种Sequencer大体逻辑相似但又有差别。
MultiProducerSequencer的next方法中,首先获取生产者位置并加上n再减去buffersize,将他和消费者位置比较,如果大于则表明生产者超过了消费者一圈,这是不可行的,否则是可行的就用cas更新生产者位置。获取消费者位置的时候并不是从gatingSequences中直接获取最小的那个,而是通过一个gatingSequenceCache来获取的,这是因为sequence是一个频繁改变,被多个线程操作的对象,并且每次去获取都要去找最小值,为了减少不必要的获取,每次从gatingSequences中获取一次最小值时将其缓存起来,在生产者没有追到这个缓存的最小值前,可以不用去获取最新的最小值。当追上这个最小值的时候,就需要从gatingSequences中获取最小值,如果生产者还是超过了一圈那么就暂停一下,再重复以上操作,否则就将最小值赋值给gatingSequenceCache并重复以上操作。下面是com.lmax.disruptor.MultiProducerSequencer#next(int)的代码。

do
{
    current = cursor.get();                                                         生产者当前位置
    next = current + n;                                                             插入n个新数据后的位置

    long wrapPoint = next - bufferSize;                                             新位置减去ringbuffer长度
    long cachedGatingSequence = gatingSequenceCache.get();                          消费者位置

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)         wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈
    {
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);    获取最新的消费者位置

        if (wrapPoint > gatingSequence)                                             如果还是超过一圈则等待并重试
        {
            LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
            continue;
        }

        gatingSequenceCache.set(gatingSequence);                                   更新缓存的消费者位置
    }
    else if (cursor.compareAndSet(current, next))                                  更新生产者位置
    {
        break;
    }
}
while (true);

SingleProducerSequencer的next方法的逻辑和MultiProducerSequencer差不多。不同的是由于只有一个生产者线程,因此SingleProducerSequencer直接使用了一个long的nextValue来表示可生产数据的位置,一个long的cachedValue来表示消费者位置缓存。下面是com.lmax.disruptor.SingleProducerSequencer#next(int)的代码。

long nextValue = this.nextValue;                                                  获取可生产数据的最小位置               

long nextSequence = nextValue + n;                                                插入n个新数据后的位置
long wrapPoint = nextSequence - bufferSize;                                       新位置减去ringbuffer长度
long cachedGatingSequence = this.cachedValue;                                     消费者位置

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)         wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈
{
    cursor.setVolatile(nextValue);  // StoreLoad fence

    long minSequence;
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  获取消费者最新位置,如果还是超过一圈则等待并重试
    {
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }

    this.cachedValue = minSequence;                                              保存消费者位置
}

this.nextValue = nextSequence;                                                   保存生产者位置

2种Sequencer的获取新数据位置的逻辑相似,但是发布数据的逻辑却完全不一样。SingleProducerSequencer的发布逻辑较为简单,publish方法中直接更新生产者位置cursor,注意cursor和nextValue的差别,nextValue用来获取新数据的位置,而cursor是已经发布的数据的位置,对于消费者来说cursor才是真正的生产者位置。下面是com.lmax.disruptor.SingleProducerSequencer#publish(long)的代码。

 @Override
public void publish(long sequence)
{
    cursor.set(sequence);                  更新生成者位置
    waitStrategy.signalAllWhenBlocking();  通知等待的消费者消费
}

这种方式用在MultiProducerSequencer上显然是不合适的,因为一个生产者发布可能会导致其他生产者也发布了。事实上,MultiProducerSequencer在next方法中就直接更新了cursor。MultiProducerSequencer用一个长度和ringbuffer相同的数组availableBuffer来跟踪数据的发布状态。下面是com.lmax.disruptor.MultiProducerSequencer发布相关的代码。

@Override
public void publish(final long sequence)
{
    setAvailable(sequence);                             设置改位置的状态
    waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

可以看到使用cursor来获取消费者可消费的最大位置是不合适的了。在Sequencer中有个getHighestPublishedSequence(long lowerBound, long availableSequence)方法用来返回可以被消费的最大位置。对于SingleProducerSequencer由于是发布时更新cursor,因此可以直接返回availableSequence;对于MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]区间上找到最小的已发布位置。


SequenceBarrier

SequenceBarrier是协调消费者的进度和它依赖的进度的。这里说依赖是因为消费者本身是有层级的,第一层的消费者依赖(不超过)生产者的进度,第二层的消费者依赖(不超过)第一层的消费进度。从构造方法可以看出当传入一个长度为0的dependentSequences数组时,该barrier的dependentSequence就是生产者的位置。如果大于0就用FixedSequenceGroup包装一下dependentSequences数组,FixedSequenceGroup的get方法返回的就是dependentSequences数组的最小值。下面是com.lmax.disruptor.ProcessingSequenceBarrier的构造函数。

ProcessingSequenceBarrier(
    final Sequencer sequencer,
    final WaitStrategy waitStrategy,
    final Sequence cursorSequence,
    final Sequence[] dependentSequences)
{
    this.sequencer = sequencer;
    this.waitStrategy = waitStrategy;
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length)            如果dependentSequences长度为0,就依赖生产者进度
    {
        dependentSequence = cursorSequence;
    }
    else
    {
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}

SequenceBarrier的核心方法就是waitFor(final long sequence)。该方法是用来等待入参sequence变成可消费状态的。使用waitStrategy来等待并获取一个有效的sequence,在waitstrategy的所有实现中,这个返回值其实就是dependentSequence。最后通过Sequencer的getHighestPublishedSequence方法获取[sequence,dependentSequence]区间内可消费的最大位置。下面是com.lmax.disruptor.ProcessingSequenceBarrier#waitFor(final long sequence)的代码。

public long waitFor(final long sequence)
    throws AlertException, InterruptedException, TimeoutException
{
    checkAlert();
    等待sequence有效并返回dependentSequence位置
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);        

    if (availableSequence < sequence)
    {
        return availableSequence;
    }
    返回[sequence,dependentSequence]最小已发布位置
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

WaitStrategy是消费者等待消费的动作。判断sequence是否有效的方法是和dependentSequence比较,当且仅当sequence小于等于dependentSequence时有效。 比如如下代码:

while ((availableSequence = dependentSequence.get()) < sequence)
{
    等待
}
return availableSequence;


消费者

Disruptor的消费者有2种一种是WokerHandler一种是EventHandler。可以设置多个WokerHandler,多个WokerHandler会一起去处理所有的数据,也可以设置多个EventHandler,多个EventHandler会分别处理所有的数据。
WokerHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)方法用来创建。该方法会创建一个workPool,workPool里面有sequenceBarrier,除此外workPool里还有一个workSequence。每个workHandler会创建一个workProcessor,workSequence也会传入workProcessor的构造方法。下面是com.lmax.disruptor.WorkerPool部分代码。

private final AtomicBoolean started = new AtomicBoolean(false);
private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer<T> ringBuffer;
// WorkProcessors are created to wrap each of the provided WorkHandlers
private final WorkProcessor<?>[] workProcessors;
@SafeVarargs
public WorkerPool(
    final RingBuffer<T> ringBuffer,
    final SequenceBarrier sequenceBarrier,
    final ExceptionHandler<? super T> exceptionHandler,
    final WorkHandler<? super T>... workHandlers)
{
    this.ringBuffer = ringBuffer;
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];

    for (int i = 0; i < numWorkers; i++)
    {
        workProcessors[i] = new WorkProcessor<>(            创建WorkHandler执行器
            ringBuffer,
            sequenceBarrier,                                协调消费进度
            workHandlers[i],
            exceptionHandler,
            workSequence);                                  所有WorkHandler使用一个workSequence
    }
}

WorkProcessor可以看成是workHandler的执行者,他的核心方法是run。run方法中使用CAS从workSequence中获取要消费的下标,可以看出workPool中的所有workProcessor是从同一个workSequence中获取的,因此一个workPool里的workHandler是共同消费数据的。当成功获取到需要nextSequence后,将其于cachedAvailableSequence比较,如果小于等于cachedAvailableSequence表示可以消费,否则使用sequenceBarrier等待并重新获取依赖(可以先理解为生产者)的最大可消费位置。这里cachedAvailableSequence和sequencer中提到了gatingSequenceCache思路是一样的,为了不用每次都去获取,每次获取后将其保存一起来,消费者还没消费到这个位置的时候,可以不用去获取,因为这时消费者一定没有超过依赖。

while (true)
{
    try
    {
        省略注解
        if (processedSequence)
        {
            processedSequence = false;
            do
            {
                nextSequence = workSequence.get() + 1L;                            从workSequence的下一个位置
                sequence.set(nextSequence - 1L);
            }
            while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));  竞争这个位置
        }
                                                                           
        if (cachedAvailableSequence >= nextSequence)                               如果这个位置小于缓存的依赖的位置
        {
            event = ringBuffer.get(nextSequence);                                  获取这个位置的数据并消费
            workHandler.onEvent(event);
            processedSequence = true;
        }
        else
        {
            cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);      等待并获取最大可消费位置
        }
    }
    省略部分代码
}

EventHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWith(final EventHandler<? super T>... handlers)方法创建的。每一个EventHandler会被创建为一个BatchEventProcessor。BatchEventProcessor的核心方法是processEvents方法。该方法中就是使用sequencerBarrier去获取了依赖的最新位置,然后从直接当前位置一直消费到依赖最新的位置。这和WorkProcessor是不同的,因为BatchEventProcessor中的sequence各自增长互不影响,而WorkProcessor的sequence都是从workSequence中去争抢,所以多个EventHandler是分别消费所有的数据。下面是com.lmax.disruptor.BatchEventProcessor#processEvents方法代码。

while (true)
{
    try
    {
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);                直接获取最大可消费位置
        if (batchStartAware != null)
        {
            batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
        }

        while (nextSequence <= availableSequence)                                           直接消费到最大可消费位置
        {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
            nextSequence++;
        }

        sequence.set(availableSequence);                                                   一匹消费完后才更新消费进度
    }

Disruptor中handleEventsWit方法和handleEventsWithWorkerPool方法创建ProcessingSequenceBarrier时传入的dependentSequences都是长度为0的Sequence数组,这样创建的ProcessingSequenceBarrier的dependentSequences就是生产者的位置,这样创建出来的消费者就是依赖于生产者进度的。这2个方法都返回EventHandlerGroup,它包含了表示消费者进度的Sequence数组,当使用EventHandlerGroup创建消费者时就会使用该Sequence数组作为参数创建ProcessingSequenceBarrier,这样创建出来的消费者就会依赖前一个消费者的消费进度。Disruptor整体运行如下图所示:
disruptor
其中生产者的位置是7,生产者的gatingSeq指向消费者依赖图中的最末端的消费者的seq,表示生产者不能超过最末端的消费者;workpool有2个workhandler分别在2,3,workpool的seqbarrier指向生产者,表示workpool不能超过生产者;eventhandler目前消费到位置是1,他的seqbarrier指向workpool表示其消费进度不能超过workpool。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部