Kafka-Flink-Kafka End-to-End Exactly-Once

原创
2020/08/26 21:41
阅读数 1.9K

Kafka-Flink-Kafka End-to-End Exactly-Once

End-to-End Exactly-Once

我们知道Flink的checkpoint机制实现了Flink内部的exactly-once语义。而所谓的end-to-end exactly-once即数据从数据源被读出,经过Flink处理到结果被写入到最终存储,整个流程数据只被<u>有效处理一次</u>。举个简单的例子,Flink从Kafka读取一个数据经过处理后将结果写入Kafka,在成功写入的同时该数据的offset被提交,从此该数据不再被消费。如果failover,那么可以重新读取该offset的数据,这就是有效处理一次的原因。真正只处理一次是不可能的,这里的有效处理一次是从结果来看,数据只被处理了一次。当然这种处理一个数据提交一个offset的方式既不满足所有的计算场景(例如聚合),也不符合checkpoint机制。我们需要考虑怎样将保证了Flink内部exactly-once的checkpoint机制延伸到Flink外部系统。总而言之,Flink End-to-End Exactly-Once的关键在于Flink如何在checkpoint机制下与数据源和最终存储交互,保证数据被有效处理一次。

TwoPhaseCommitSink

实现End-to-End Exactly-Once的一个关键是在结果成功写入最终存储的同时,产生结果的所有数据的offset需要同时被提交。否则,在结果写入成功到offset提交成功这段时间内如果发生failover,那么数据将被重复消费。也就是说,offset的保存和结果的保存要放到一个事务里。我们可以在org.apache.kafka.clients.producer.KafkaProducer中可以看见一个有趣的方法sendOffsetsToTransaction--offset竟然是在producer中提交的。对于Flink job来说,offset的保存在flink job的source上,结果的保存在flink job的sink上。Flink的source和sink的并行度可以大于1的,也可以在不同的机器上,这似乎在说明我们实现的事务还是分布式事务。

分布式事务的经典解决方法是两阶段提交。第一阶段,事务协调者向事务参与者发送prepare请求,事务参与者收到prepare请求后执行该事务操作但并不提交执行结果并根据执行情况向事务协调者反馈ack,如果反馈no,代表事务参与者不能成功执行该事务;如果反馈yes,代表事务参与者能成功执行该事务并且事务参与者向事务协调者做出承诺--当事务协调者通知提交该事务时,该事务参与者一定能提交该事务。第二阶段,当事务协调者收到所有反馈后,如果不全部是yes,那么事务协调者向事务参与者发送abort请求,事务参与者收到abort请求后回滚该事务并释放资源;如果全部是yes,那么事务协调者向事务参与者发送commit请求,事务参与者收到commit请求后提交事务。

Flink将两阶段提交融入到checkpoint机制中,Flink checkpoints中的两阶段提交过程大致如下:CheckpointCoordinator向数据流中插入CheckpointBarrier被视为第一阶段,CheckpointBarrier被看作是prepare请求。当算子收到CheckpointBarrier后执行snapshotState,对于FlinkKafkaConsumer来说会将offset保存在状态中,而对于TwoPhaseCommitSink来说会在该方法中对事务进行preCommit并将snapshotState执行结果反馈给CheckpointCoordinator,可以视为第一阶段的ack反馈。CheckpointCoordinator收到所有checkpoints反馈后将metadata存入状态远端,这不仅表明了本次checkpoint成功完成,还表明了本次两阶段提交确定了决议即:提交本次事务。CheckpointCoordinator向算子发出checkpointComplete通知视为第二阶段提交,收到通知的算子在notifyCheckpointComplete方法中进行真正的事务提交。

Flink提供了org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSink将实现两阶段提交。TwoPhaseCommitSink提供了beginTransaction,preCommit,commit,abort等两阶段提交中关键步骤的抽象方法并在checkpoint相关回调方法中提供了checkpoint过程中两阶段提交的实现。

  • initializeState方法主要做2件事:第一,处理上一次运行保存在状态中的事务,将pendingCommitTransactions进行恢复并提交,将上一次停止时的currentTransaction进行恢复并中断。在恢复job时,会从指定或者最近一次完成的checkpoint或savepoint上恢复,而完成的checkpoint表明了需要提交,因此将该checkpoint之前的事务都进行提交,由于offset也是该checkpoint下保存的offset,所以本次checkpoint开启的事务应该被中断放弃。第二,开启本次运行的第一个事务。
  • snapshotState方法将currentTransaction进行preCommit并将其放入pendingCommitTransactions,之后开启一个新的事务并将事务相关信息存入状态。preCommit方法的成功执行不仅表示事务操作可以执行,最重要的是还要做出承诺:之后事务一定能被提交。
  • notifyCheckpointComplete方法将pendingCommitTransactions中该checkpoint及其之前的checkpoints进行commit并从中删除。

需要注意的是在checkpoint完成的时候,offset已经保存在状态中同时提交事务决议已定,但是还未真正的提交。因此,事务参与者做出的承诺或者说preCommit方法的实现是十分重要的。

Kafka-Flink-Kafka

org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer是TwoPhaseCommitSink的实现类,他实现了两阶段提交中的抽象类,其中我们重点关注的是:

  • beginTransaction方法根据构造函数传入的semantic来创建不同的KafkaProducer,如果是Semantic.EXACTLY_ONCE,将创建带事务的KafkaProducer,初始化并开启一个kafka事务。
  • preCommit方法在Semantic.EXACTLY_ONCE时会调用KafkaProducer的flush将数据刷入kafka。上文曾提到在两阶段提交中的第一次提交中,当事务参与者反馈yes的时候,需要做出一个承诺,为了实现该承诺需要将事务操作持久化,这就是调用flush的目的。
  • commit方法中调用KafkaProducer的commitTransaction提交kafka事务。

checkpoints将数据流分成了一段一段的,每一段都会对应一个KafkaProducer,每一个事务性的KafkaProducer都有一个transactionId,每个FlinkKafkaProducer子任务使用的transactionId由org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator产生。由于可以同时发起多个checkpoints因此需要同时使用多个KafkaProducer,最多的个数由kafkaProducersPoolSize决定,默认为5,如果不够将抛出异常,如下

String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
    throw new FlinkKafkaException(
        FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY,
        "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
}

在重启job恢复状态的时候,会将上次运行停止时的currentTransaction进行中断,但是如果上次运行停止时一个checkpoint都还没完成,那么状态里将没当时currentTransaction的信息,在FlinkKafkaProducer的initializeState方法会将处理这种情况,如下

nextTransactionalIdHint = new FlinkKafkaProducer.NextTransactionalIdHint(0, 0);
// this means that this is either:
// (1) the first execution of this application
// (2) previous execution has failed before first checkpoint completed
//
// in case of (2) we have to abort all previous transactions
abortTransactions(transactionalIdsGenerator.generateIdsToAbort());

但是由于重启后并行度可能改变,因此transactionalIdsGenerator使用safeScaleDownFactor扩大产生需要中断的transactionId范围,如下:

public Set<String> generateIdsToAbort() {
    Set<String> idsToAbort = new HashSet<>();
    for (int i = 0; i < safeScaleDownFactor; i++) {
        idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
    }
    return idsToAbort;
}
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部