Flink+Disruptor踩坑记
记录一次Flink中使用disruptor的坑,背景是这样的:有一个flink job工作流程是从kafka读取日志数据,经过处理后写入es。写入es的sink采用了disruptor,sink将数据写入disruptor,disruptor消费者(workpool)负责将数据累积后写入es。该job在某地上线启动后,过一会儿就会出现checkpoint超时的情况,导致job长时间没有正常工作。此时,如果停止job会导致flink taskmanager进程shutdown。flink集群是standalone cluster,flink版本是1.7.2。
taskmanager怎么停了
首先来看看flink taskmanager进程shutdown的日志,从日志里可以看到,进程shutdown的原因是在停止job的时候该taskmanager上运行的某个task180多秒都没有响应停止指令,因此taskmanager自己把自己杀死了。好吧,首先看看task在taskmanager上是怎样运行的,又是怎样停止的?
org.apache.flink.runtime.taskmanager.Task - Task did not exit gracefully within 180 + seconds.
org.apache.flink.runtime.taskexecutor.TaskExecutor - Task did not exit gracefully within 180 + seconds.
org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Fatal error occurred while executing the TaskManager. Shutting it down...
WARN org.apache.flink.runtime.taskmanager.Task - Task 'XXXXX' did not react to cancelling signal for 30 seconds, but is stuck in method:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
com.lmax.disruptor.RingBuffer.next(RingBuffer.java:263)
我们从org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask方法可以看出,job的算子最终被封装成了org.apache.flink.runtime.taskmanager.Task。Task实现了Runnable,因此核心方法就是run方法。从该方法入手,可以跟踪到org.apache.flink.streaming.runtime.tasks.OneInputStreamTask#run方法,该方法有一个while循环,Task就是在这个循环中不断调用org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput来处理流中的数据。综上我们可以得出结论,job的任务最终在taskmanager上被封装在一个线程中,并且在一个while循环中不断处理流中数据。 我们知道了任务最终被封装成了线程,那任务是怎样停止的呢?我们从org.apache.flink.runtime.taskexecutor.TaskExecutor#cancelTask跟踪到org.apache.flink.runtime.taskmanager.Task#cancelOrFailAndCancelInvokable方法,从该方法可以看到为了停止task创建了3个线程,分别是TaskCanceler,TaskInterrupter和TaskCancelerWatchDog。TaskCanceler做了3件事,第一,让处理流的while循环退出;第二,关闭资源;第三,中断task执行线程。TaskInterrupter做了1件事就是不断间歇性的中断task执行线程。TaskCancelerWatchDog就是不断检查task执行线程是否退出,如果超时未退出,就关闭当前的jvm。综上我们可以得出结论,Task的停止要做几个工作,第一,退出处理流的while循环;第二,关闭资源;第三,不断间歇性中断Task执行线程;第四,不断检查线程状态如果长时间未退出则关闭jvm。 综上,可以知道taskmanager退出的原因是task的执行线程在正常停止job的时候没有正常退出。我们从日志可以看出线程卡在了disruptor的生产数据时申请位置的next方法中,并且next方法没有处理线程中断。(我们知道中断线程只是改变了标志位,需要在业务代码里自己处理线程中断)
checkpoint为啥超时了
我们回到org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput方法,从这个方法我们可以看出它不仅处理了流中的业务数据,还处理watermark,streamStatus,latencyMarker,还在barrierHandler.getNextNonBlocked()中处理了CheckpointBarrier。也就是说Task处理业务数据,watermark和checkpoint都是用一个线程。我们从日志可以看出线程卡在了disruptor的插入方法,使得Task执行线程没有机会去处理checkpoint,所以checkpoint超时了。
什么,disruptor这么坑?
从上面的分析可以知道,一切都是由线程卡在disruptor写入时造成的。从com.lmax.disruptor.MultiProducerSequencer#next方法可以得知,该方法只有在成功申请了位置后才会返回。也就是说线程在这里长时间未能成功申请到位置,导致线程一直在这个方法中循环(更多disruptor内容请查看这里)。由此可以初步判断为消费者因为写 es慢导致disruptor队列满使得Task执行线程长时间无法插入成功。从flink日志中也确实发现了长达几百秒的es慢插入,但是从数量上来说在某一时间段内这种慢插入是远少于disruptor消费者,但是不管怎么样还是用jstack查看一下disruptor消费者线程的运行情况。不看不知道,一看吓一跳。从jstack信息来看除了1个消费者阻塞在了写es的代码上,其余消费者竟然都阻塞在了WaitStrategy上。这意味着,一方面生产者无法生产数据,似乎队列就像满了一样;另一方面消费者无法消费数据,似乎队列就像空的一样。这种看似矛盾的现象到底是怎样产生的呢?难道一个消费者的阻塞最终造成了整个队列无法正常使用?答案是的(什么,这么坑)。这一切都要从disruptor的消费者进度的保存位置说起。我们首先来看看java.util.concurrent.ArrayBlockingQueue,这是jdk实现的循环队列,当使用它作为java线程池的工作队列的话,如果一个线程在执行任务的时候阻塞了是不会影响队列的生产和消费的,这是因为ArrayBlockingQueue是在队列保存队首和队尾位置的,当一个线程阻塞了,消费者位置可由其他消费者继续推进。而disruptor不一样他的消费者位置是保存在消费者那里的,如果消费者线程卡在用户代码里,那么消费者就没有机会更新位置,这样的话,当生产者绕过一圈来到这个阻塞的消费者的时候就无法成功写入数据了。而其余非没有阻塞的消费者可以把队列里的数据消费完,此时由于生产者无法生产新数据,这样消费者就阻塞到WaitStrategy上。下面是com.lmax.disruptor.WorkProcessor#run
省略
while (true)
{
try
{
if (processedSequence)
{
processedSequence = false;
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L); 设置消费者位置
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event); 执行用户代码,如果这里线程被卡住了,那么消费者位置无法被更新
processedSequence = true;
}
省略
启发
- disruptor确实不适合消费者会有较长时间阻塞的场景。在本案例中disruptor其实并没有多大作用,后期已经被去掉。
- 在编写可能会阻塞的flink算子时,应该考虑job被cancle时的线程中断问题。例如,使用带超时的api,超时后线程可以返回并检查是否中断,如果中断了就跳出阻塞等待,使得task执行线程有机会处理cancle,避免taskmanager杀死自己。
- 在standalone cluster模式下taskmanager杀后,jobmanager是无法启动它的,这会导致其他job被重启,集群slot变少也可能会导致job无法启动。相比之下,flink on yarn会好一点,并且single-job模式相当于每个job都有一个集群,job间隔离比较好,job停止的时候taskmanager和jobmanager的进程都退出了。而standalone cluster模式进程是没有停止的,也就是说job如果开启了线程,在standalone cluster模式下一定要在job停止的时候终止这些线程,否则,线程不会随job停止而停止,他会一直存在,由于线程栈是gc root,线程引用的内存资源也无法释放,造成泄漏。