Disruptor
Disruptor是java圈子里著名的并发队列,它是一个基于生产者-消费者模型,并优化了cpu伪共享的高性能队列。要理解disruptor需要理解一下几个概念:RingBuffer,Sequence,Sequencer,SequenceBarrier。
RingBuffer
RingBuffer是disruptor中用来存数据的环形数组。Disruptor的基本数据结构就是一个循环队列。学习过数据结构的同学都知道循环队列是一个基于数组的队列,用一个变量来表示队头位置下标,另一个变量来表示队尾位置下标。当位置下标到达数组末尾的时候,下标的下一个位置就移动到数组开头,例如jdk中的java.util.concurrent.ArrayBlockingQueue,它用putIndex来表示队尾位置,用takeIndex来表示队头位置,当队头或队尾到达数组末尾的时候,被置为数组开头位置。下面是java.util.concurrent.ArrayBlockingQueue中的代码。
/** items index for next take, poll, peek or remove */
int takeIndex; 队头位置
/** items index for next put, offer, or add */
int putIndex; 队尾位置
private void enqueue(E x) { 入队方法
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; 队尾入队数据
if (++putIndex == items.length) 如果已经到了数组的最后一个位置
putIndex = 0; 位置置于数组开头
count++;
notEmpty.signal();
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; 队头出队数据
items[takeIndex] = null; 如果已经到了数组的最后一个位置
if (++takeIndex == items.length) 位置置于数组开头
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
Sequence
Sequence是disruptor对队列中表示位置的下标位置的抽象。为什么要用一个类而不是一个整型来表示数组的下标位置呢?这是因为disruptor在试图解决cpu伪共享问题。CPU伪共享简单讲就是在不同cpu核的多个线程他们分别在自己的cache中缓存了同一个变量,当一个线程修改了这个变量将会使用MESI协议将别的线程缓存了相同变量的cache-line失效。如果多个线程高频修改一个变量可能会相互影响使得cpu缓存的作用大打折扣。那怎么才能尽量避免这种情况呢?disruptor的做法是让每个消费者都维护着自己的sequence,并且sequence做了cache-line填充,使得每个sequence将占用整个cache-line。一般来说一个cache-line是64个字节,用一个long来表示位置,那么就需要8个long。因此,disruptor在表示下标的long变量前后都放置了7个long,这样当读取value时,无论从哪个方向读取64个字节都能保证cache-line被填充。下面是com.lmax.disruptor.Sequence的代码。
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7; 填充
}
class Value extends LhsPadding
{
protected volatile long value; 真正的值
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15; 填充
}
Sequencer
Sequencer是用来协调生产者进度和消费者进度的。消费者不能跑到生产者前面去了,生产者也不能超过消费者一圈。AbstractSequencer有3个重要的参数,cursor表示的生产者的位置,gatingSequences表示的是末端消费者的位置,waitstrategy表示当没有数据给消费者时,消费者的等待行为。下面是com.lmax.disruptor.AbstractSequencer的代码。
protected final WaitStrategy waitStrategy; 等待策略
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 生产者位置
protected volatile Sequence[] gatingSequences = new Sequence[0]; 消费者位置
生产者在生产数据的时候有2个步骤,第一步,获取新数据的位置;第二步,插入数据并发布,发布后的数据就可以被消费了。其中第一步对应着next方法,第二步对应着publish方法。依据生产者是单线程的还是多线程的,Sequencer被分为MultiProducerSequencer和SingleProducerSequencer,这2种Sequencer大体逻辑相似但又有差别。
MultiProducerSequencer的next方法中,首先获取生产者位置并加上n再减去buffersize,将他和消费者位置比较,如果大于则表明生产者超过了消费者一圈,这是不可行的,否则是可行的就用cas更新生产者位置。获取消费者位置的时候并不是从gatingSequences中直接获取最小的那个,而是通过一个gatingSequenceCache来获取的,这是因为sequence是一个频繁改变,被多个线程操作的对象,并且每次去获取都要去找最小值,为了减少不必要的获取,每次从gatingSequences中获取一次最小值时将其缓存起来,在生产者没有追到这个缓存的最小值前,可以不用去获取最新的最小值。当追上这个最小值的时候,就需要从gatingSequences中获取最小值,如果生产者还是超过了一圈那么就暂停一下,再重复以上操作,否则就将最小值赋值给gatingSequenceCache并重复以上操作。下面是com.lmax.disruptor.MultiProducerSequencer#next(int)的代码。
do
{
current = cursor.get(); 生产者当前位置
next = current + n; 插入n个新数据后的位置
long wrapPoint = next - bufferSize; 新位置减去ringbuffer长度
long cachedGatingSequence = gatingSequenceCache.get(); 消费者位置
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current); 获取最新的消费者位置
if (wrapPoint > gatingSequence) 如果还是超过一圈则等待并重试
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence); 更新缓存的消费者位置
}
else if (cursor.compareAndSet(current, next)) 更新生产者位置
{
break;
}
}
while (true);
SingleProducerSequencer的next方法的逻辑和MultiProducerSequencer差不多。不同的是由于只有一个生产者线程,因此SingleProducerSequencer直接使用了一个long的nextValue来表示可生产数据的位置,一个long的cachedValue来表示消费者位置缓存。下面是com.lmax.disruptor.SingleProducerSequencer#next(int)的代码。
long nextValue = this.nextValue; 获取可生产数据的最小位置
long nextSequence = nextValue + n; 插入n个新数据后的位置
long wrapPoint = nextSequence - bufferSize; 新位置减去ringbuffer长度
long cachedGatingSequence = this.cachedValue; 消费者位置
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) 获取消费者最新位置,如果还是超过一圈则等待并重试
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence; 保存消费者位置
}
this.nextValue = nextSequence; 保存生产者位置
2种Sequencer的获取新数据位置的逻辑相似,但是发布数据的逻辑却完全不一样。SingleProducerSequencer的发布逻辑较为简单,publish方法中直接更新生产者位置cursor,注意cursor和nextValue的差别,nextValue用来获取新数据的位置,而cursor是已经发布的数据的位置,对于消费者来说cursor才是真正的生产者位置。下面是com.lmax.disruptor.SingleProducerSequencer#publish(long)的代码。
@Override
public void publish(long sequence)
{
cursor.set(sequence); 更新生成者位置
waitStrategy.signalAllWhenBlocking(); 通知等待的消费者消费
}
这种方式用在MultiProducerSequencer上显然是不合适的,因为一个生产者发布可能会导致其他生产者也发布了。事实上,MultiProducerSequencer在next方法中就直接更新了cursor。MultiProducerSequencer用一个长度和ringbuffer相同的数组availableBuffer来跟踪数据的发布状态。下面是com.lmax.disruptor.MultiProducerSequencer发布相关的代码。
@Override
public void publish(final long sequence)
{
setAvailable(sequence); 设置改位置的状态
waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
可以看到使用cursor来获取消费者可消费的最大位置是不合适的了。在Sequencer中有个getHighestPublishedSequence(long lowerBound, long availableSequence)方法用来返回可以被消费的最大位置。对于SingleProducerSequencer由于是发布时更新cursor,因此可以直接返回availableSequence;对于MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]区间上找到最小的已发布位置。
SequenceBarrier
SequenceBarrier是协调消费者的进度和它依赖的进度的。这里说依赖是因为消费者本身是有层级的,第一层的消费者依赖(不超过)生产者的进度,第二层的消费者依赖(不超过)第一层的消费进度。从构造方法可以看出当传入一个长度为0的dependentSequences数组时,该barrier的dependentSequence就是生产者的位置。如果大于0就用FixedSequenceGroup包装一下dependentSequences数组,FixedSequenceGroup的get方法返回的就是dependentSequences数组的最小值。下面是com.lmax.disruptor.ProcessingSequenceBarrier的构造函数。
ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) 如果dependentSequences长度为0,就依赖生产者进度
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
SequenceBarrier的核心方法就是waitFor(final long sequence)。该方法是用来等待入参sequence变成可消费状态的。使用waitStrategy来等待并获取一个有效的sequence,在waitstrategy的所有实现中,这个返回值其实就是dependentSequence。最后通过Sequencer的getHighestPublishedSequence方法获取[sequence,dependentSequence]区间内可消费的最大位置。下面是com.lmax.disruptor.ProcessingSequenceBarrier#waitFor(final long sequence)的代码。
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
等待sequence有效并返回dependentSequence位置
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
返回[sequence,dependentSequence]最小已发布位置
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
WaitStrategy是消费者等待消费的动作。判断sequence是否有效的方法是和dependentSequence比较,当且仅当sequence小于等于dependentSequence时有效。 比如如下代码:
while ((availableSequence = dependentSequence.get()) < sequence)
{
等待
}
return availableSequence;
消费者
Disruptor的消费者有2种一种是WokerHandler一种是EventHandler。可以设置多个WokerHandler,多个WokerHandler会一起去处理所有的数据,也可以设置多个EventHandler,多个EventHandler会分别处理所有的数据。
WokerHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)方法用来创建。该方法会创建一个workPool,workPool里面有sequenceBarrier,除此外workPool里还有一个workSequence。每个workHandler会创建一个workProcessor,workSequence也会传入workProcessor的构造方法。下面是com.lmax.disruptor.WorkerPool部分代码。
private final AtomicBoolean started = new AtomicBoolean(false);
private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer<T> ringBuffer;
// WorkProcessors are created to wrap each of the provided WorkHandlers
private final WorkProcessor<?>[] workProcessors;
@SafeVarargs
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandler<? super T> exceptionHandler,
final WorkHandler<? super T>... workHandlers)
{
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor<>( 创建WorkHandler执行器
ringBuffer,
sequenceBarrier, 协调消费进度
workHandlers[i],
exceptionHandler,
workSequence); 所有WorkHandler使用一个workSequence
}
}
WorkProcessor可以看成是workHandler的执行者,他的核心方法是run。run方法中使用CAS从workSequence中获取要消费的下标,可以看出workPool中的所有workProcessor是从同一个workSequence中获取的,因此一个workPool里的workHandler是共同消费数据的。当成功获取到需要nextSequence后,将其于cachedAvailableSequence比较,如果小于等于cachedAvailableSequence表示可以消费,否则使用sequenceBarrier等待并重新获取依赖(可以先理解为生产者)的最大可消费位置。这里cachedAvailableSequence和sequencer中提到了gatingSequenceCache思路是一样的,为了不用每次都去获取,每次获取后将其保存一起来,消费者还没消费到这个位置的时候,可以不用去获取,因为这时消费者一定没有超过依赖。
while (true)
{
try
{
省略注解
if (processedSequence)
{
processedSequence = false;
do
{
nextSequence = workSequence.get() + 1L; 从workSequence的下一个位置
sequence.set(nextSequence - 1L);
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); 竞争这个位置
}
if (cachedAvailableSequence >= nextSequence) 如果这个位置小于缓存的依赖的位置
{
event = ringBuffer.get(nextSequence); 获取这个位置的数据并消费
workHandler.onEvent(event);
processedSequence = true;
}
else
{
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); 等待并获取最大可消费位置
}
}
省略部分代码
}
EventHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWith(final EventHandler<? super T>... handlers)方法创建的。每一个EventHandler会被创建为一个BatchEventProcessor。BatchEventProcessor的核心方法是processEvents方法。该方法中就是使用sequencerBarrier去获取了依赖的最新位置,然后从直接当前位置一直消费到依赖最新的位置。这和WorkProcessor是不同的,因为BatchEventProcessor中的sequence各自增长互不影响,而WorkProcessor的sequence都是从workSequence中去争抢,所以多个EventHandler是分别消费所有的数据。下面是com.lmax.disruptor.BatchEventProcessor#processEvents方法代码。
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence); 直接获取最大可消费位置
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence) 直接消费到最大可消费位置
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence); 一匹消费完后才更新消费进度
}
Disruptor中handleEventsWit方法和handleEventsWithWorkerPool方法创建ProcessingSequenceBarrier时传入的dependentSequences都是长度为0的Sequence数组,这样创建的ProcessingSequenceBarrier的dependentSequences就是生产者的位置,这样创建出来的消费者就是依赖于生产者进度的。这2个方法都返回EventHandlerGroup,它包含了表示消费者进度的Sequence数组,当使用EventHandlerGroup创建消费者时就会使用该Sequence数组作为参数创建ProcessingSequenceBarrier,这样创建出来的消费者就会依赖前一个消费者的消费进度。Disruptor整体运行如下图所示:
其中生产者的位置是7,生产者的gatingSeq指向消费者依赖图中的最末端的消费者的seq,表示生产者不能超过最末端的消费者;workpool有2个workhandler分别在2,3,workpool的seqbarrier指向生产者,表示workpool不能超过生产者;eventhandler目前消费到位置是1,他的seqbarrier指向workpool表示其消费进度不能超过workpool。