模型架构
相关概念
Producer生产者:发送消息的一方,生产者负责创建消息,然后将其投递到 Kafka 中。
Consumer消费者:接收消息的一方,消费者连接到 Kafka 上并接收消息。
Broker:服务代理节点。
Topic主题&Partition分区: Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题,而消费者负责订阅主题并进行消费。
主题是一个逻辑概念,细分为多个分区,一个分区只属于单个主题。同一主题下的不同分区包含的消息是不同的。分区在存储层面可以看做是一个可追加的日志(Log)文件,消息在被追加到分区日志文件时都会分配一个偏移量(offset)。offset是消息在分区中的唯一标识, Kafka 通过它保证消息在分区内的顺序性,不过offset并不跨越分区,即 Kafka 保证的是分区有序而非主题有序。
Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以提供比单个 broker 更强大的性能。
分区数量可以在创建主题的时候指定,也可以在创建主题之后修改,通过增加分区数量可以实现水平扩展,从而避免单一分区带来的机器I/O性能瓶颈。
Replica多副本机制: Kafka 为分区引入了多副本机制,通过增加副本数量提升容灾能力。同一分区的不同副本中保存着相同的消息(在同一时刻,副本之间并非完全一致),副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。 Kafka 通过多副本机制实现故障的自动转移。
分区中的所有副本统称为AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括leader)组成ISR(In-Sync Replicas)。消息会先发送到 leader 副本,然后 follower 副本从 leader 副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会存在一定程度的滞后,前面所说的“一定程度同步”是指可忍受的之后范围,此范围可以通过参数进行设置。其中滞后过多的副本组成OSR(Out-of-Sync Replicas)。正常情况下,所有的 follower 副本都应与 leader 副本保持一定程度的同步,即AR=ISR,OSR集合为空。
HW高水位:ISR与HW和LEO也有紧密的关系。HW(High Watermark)标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。
如图,一个日志文件中有9条消息,第一条消息的 offset 为 0,最后一条消息的 offset 为 8,offset 为 9 的虚线框代表一条待写入的消息。日志文件的 HW 为 6,表示消费者只能拉取到 0 至 5 之间的消息,offset 为 6 的小时对于消费者而言是不可见的。
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。Kafka 使用的这种 ISR 的方式有效的权衡了性能和数据可靠性之间的关系。
数据流转流程
生产者客户端由两个线程协调运行,分别是主线程和 Sender 线程(发送线程)。
①②③ 在主线程中,KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器之后,缓存到消息累加器(RecordAccumulator,也成为消息收集器)中。
Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
④⑤ RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,从而减少网络传输的资源消耗以提升性能。RecordAccumulator 的缓存大小可以通过生产者端参数 buffer.memory 配置,默认 32MB。如果生产消息的速度超过发送消息的速度,会导致生产者空间不足,此时 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这取决于参数 max.block.ms 的配置,默认值 60秒。
RecordAccumulator 内部为每个分区都维护了一个双端队列 Deque<ProducerBatch>,队列中的内容就是 ProducerBatch(指一个消息批次),消息写入缓存时,追加到队列尾部,Sender 获取消息时,从队列头部读取。ProducerRecord 是生产者创建的消息,被包含在 PruducerBatch 中,这样做可以减少网络请求次数以提升整体的吞吐量。
⑥ Sender 从RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<PruducerBatch>>的数据结构转换成<Node,List<PruducerBatch>>的形式,其中 Node 表示 Kafka 集群的 broker 节点。之后 Sender 还会进一步将数据封装成 <Node,Request> 的形式,这样就可以将 Request 请求发往各个 Node 了。
⑦ 请求在从 Sender 线程发往 Kafka 之前还会保存到 InFligthRequests 中,InFligthRequests 保存对象的具体形式为 Map<NodeId,Deque<Request>>,缓存内容是已经发出去但还没有收到响应的请求(NodeId 是 String 类型,表示节点的 id 编号)。与此同时,InFligthRequests 可以通过设置参数 max.in.flight.requests.per.connection(默认值 5),来限制每个连接(客户端与Node之间)最多缓存的请求数,超过该值之后,不再向这个连接发送请求,直到有缓存的请求收到了响应。
生产者核心参数
acks
指定分区中必须要有多少个副本收到这条消息,才认为这条消息是成功投递的。这个参数涉及到消息的可靠性和吞吐量之间的权衡。
acks = 1:默认值。生产者发送消息后,只要分区的 leader 副本成功写入,就会收到来自服务端的成功响应。如果消息写入 leader 副本成功并且返回成功给生产者,此时在消息被其他 follower 拉取之前 leader 崩溃,则此消息会丢失。
acks = 0:生产者发送消息后,不等待任何服务端的响应。
acks = -1 或 acks = all:生产者发送消息后,需要等待 ISR 中所有副本都成功写入消息后才能收到来自服务端的成功响应。这样设置可以达到最高的可靠性。但并不意味着一定可靠,因为当 ISR 中只有 leader 副本,这样就退化成了 acks = 1 的情况。要获得更高的消息可靠性,需要配合 min.insync.replicas 等参数的联动。
max.request.size
设置生产者客户端能发送消息的最大值,默认 1MB。不建议修改,因为这个参数还涉及到与其他参数的联动,比如 broker 端的 message.max.size
retries 和 retry.backoff.ms
retries 配置生产者重试次数,默认 0,即在发生异常时不进行任何重试动作。
retry.backoff.ms 用来配置两次重试之间的时间间隔,默认 100 。
注意事项:
如果 retries 大于 0,并且 max.in.flight.requests.per.connection 参数配置大于 1,就会出现分区顺序错乱的问题。一般而言,在需要保证消息顺序性的场景中,建议把 max.in.flight.requests.per.connection 参数设置为 1,而不是把 retries 设置为 0,不过这样会影响整体的吞吐。
compression.type
指定消息的压缩方式,默认 “none”,即不压缩。选项有“gzip” “snappy” 和 “lz4”。对消息进行压缩可以极大减少网络传输量、降低网络 I/O,从而提高吞吐量。消息压缩是一种用时间换空间的优化方式,如果对延迟有一定的要求,则不推荐对消息进行压缩。
connections.max.idle.ms
指定多久之后关闭闲置链接,默认 9 分钟。
linger.ms
指定生产者发送 ProducerBatch 之前等待消息(ProducerRecord)加入 ProducerBatch 的时间,默认 0,生产者客户端会在 ProducerBatch 被填满或者等待时间超过 linger.ms 值时发送消息。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
receive.buffer.bytes
设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认 32KB。如果设置为 -1,则使用操作系统默认值。
send.buffer.bytes
设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认 32KB。如果设置为 -1,则使用操作系统默认值。
request.timeout.ms
配置 Producer 等待请求响应的最长时间,默认 30秒。超时后可以选择重试。
注意事项:
这个参数要比 broker 端的 replica.lag.time.max.ms 参数值大,这样可以减少因客户端重试引起的消息重复的概率。
消息消费
Kafka 中的消费是基于拉(pull)模式的,在 Kafka 中消息消费是一个不断轮询的过程,即重复调用 poll() 方法,返回值是所订阅的主题(分区)上的一组消息(ConsumerRecords)。
消费者消费到的每条消息类型为 ConsumerRecord,与生产者发送的消息类型 ProducerRecord 类型对应
位移提交
在每次调用 poll() 方法时,返回的是还没有被消费过的消息集,要做到这一点,就需要记录上一次消费时的消费位移,并且这个消费位移需要持久化保存,避免消费者重启之后丢失偏移值。在旧版本的消费者客户端中,消费位移是存储在 ZooKeeper 中的。新版消费者客户端中,消费位移存储在 Kafka 内部主题 _consumer_offsets 中。
Kafka 默认的消费位移提交方式是自动提交,由消费者客户端参数 enable.auto.commit 配置,默认为 true。注意这个自动提交不是每消费一条消息就提交一次,而是定期提交,由参数 atuo.commit.interval.ms 配置,默认 5秒,此参数生效的前提是 enable.auto.commit 参数设置为 true。
Kafka 的手动提交有同步提交和异步提交两种方式:
同步提交:commitSync()、commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
同步提交方法会根据 poll() 方法拉取的最新消息的位移来进行提交,只要没有发生不可恢复的错误,就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,可以将其捕获并做相应处理。
在实际应用中,为了提高性能,很少采用每消费一条消息就提交一次消费位移的方案,更多时候是按照分区粒度划分提交位移界限。
异步提交:commitAsync()、commitAsync(OffsetCommitCallback callback)、commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
与同步提交相反,异步提交的方式在执行的时候不会阻塞消费者线程。
commitAsync() 方法可以通过 callback 参数,提供一个异步提交的回调方法,当消费位移提交完成后会回调 OffsetCommitCallback 中的 omComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) 方法。
异步提交的时候同样会发生失败,但此时仅通过重试是不行的,比如某次异步提交的消费位移是 x,但是提交失败了,然后下一次异步提交了 x+y,这次成功了,如果此时进行对 x 的重试提交,并且重试成功,那么此时的消费位移又变成了 x,如果此时消费者发生异常(或者再均衡),那么恢复之后的消费者(或者新的消费者加入)就会从 x 开始消费,这样就发生了重复消费的问题。为此,可以自行维护一个递增的序号来保存异步提交的顺序,每次位移提交之后就增加序号,在位移提交失败需要重试的时候,检查所提交的位移和序号大小,如果位移小于序号,说明已经有更大的位移已经提交过了,不需要进行重试,如果两者相同,说明可以进行本次重试。
再均衡:Rebalance
再均衡指分区的所属权从一个消费者转移到另一个消费者的行为,为消费组具备高可用性和伸缩性提供保障。不过在再均衡发生期间,消费组内的消费是无法获取消息的,即变得不可用。此外,当一个分区被重新分配给另一个消费者时i,消费者当前状态也会丢失。比如消费者消费完消息时还没有来得及提交消费位移就发生了再均衡,之后新的消费者会重新把这部分消息重复消费,所以在一般情况下,应尽量避免不必要的再均衡。
再均衡监听器:ComsumerRebalanceListener
在 subscribe(Collection<String> topics, ConsumerRebalanceListener listener) 和 subscribe(Pattern pattern, ConsumerRebalanceListener listener) 方法中都有它的身影。
再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾工作。比如在发生再均衡动作之前,执行消费位移提交方法,避免不必要的重复消费问题。
触发再均衡的情况:
- 有新的消费者加入消费组。
- 有消费者宕机下线。
- 有消费者主动退出消费组。比如客户端调用了 unsubscribel() 方法取消对某些主题的订阅。
- 消费者组对应 GroupCoorinator(Kafka服务端用于管理消费组的组件) 节点发生变更
- 消费者组内所订阅的任一主题的分区数量发生变化。
再均衡原理
第一阶段(FIND_COORINATOR):消费者需要确定所属消费组对应的 GroupCoorinator 所在的 broker,并与 broker 创建连接。如果消费者已经保存了相应的 GroupCoorinator 则进入第二阶段,否则,向集群中负载最小的节点发送 FindCoorinatorRequest 请求来查找对应的 GroupCoorinator。
第二阶段(JOIN_GROUP):此阶段的消费者向 GroupCoorinator 发送JoinGroupRequest 请求。
第三阶段(SYNC_GROUP):leader 消费者根据在第二阶段选举出的分区分配策略来实施具体的分区分配。在同步阶段,各个消费者会向 GroupCoorinator 发送 SyncGroupRequest 请求来同步分配方案。当消费者收到分配方案后,会调用 PartitionAssignor 中的 onAssignment() 方法,随后再调用 ConsumerRebalanceListener 中的 onPartitionAssigned() 方法。之后开启心跳任务,消费者向 GroupCoorinator 定期发送 HeartbeatRequest 请求来确定彼此在线。
第四阶段(HEARTBEAT):进入这个阶段后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。
消息传输保障
Kafka从 0.11.0.0 版本开始引入了幂等和事务这两个特性,以此来实现 EOS (exactly once semantics,精确一次处理语义)。
幂等
Kafka 的幂等功能可以避免如生产者进行重试的时候有可能的重复写入消息。
通过配置客户端参数 enable.inempotence 设置为 true 即可,默认值 false。properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
不过要确保幂等性功能正常,还需要确保生产者客户端的 retries、acks、max.in.flight.requests.per.connection 这几个参数配置正确。实际上在使用幂等性的时候,不推荐配置这几个参数。
如果显式的设置了这几个参数,要求如下,否则会抛出 ConfigException:
- retries 必须大于 0
- max.in.flight.requests.per.connection 必须大于 5
- acks 必须为 -1(all)
实现原理
Kafka 引入了 producer id(PID)和 序列号(sequence number)这两个概念。每个生产者实例在初始化时都会被分配一个 PID ,对于每个 PID ,消息发送到的每一个分区都有对应的序列号(从 0 开始单调递增)。生产者每发送一条消息就会将 <PID, 分区> 的序列号加 1
broker 端在内存中为每一对 <PID, 分区> 维护一个序列号。对于收到的每条消息,只有当他的序列号(SN_new) 比 broker 端中维护的对应序列号 (SN_old) 大 1,即 SN_new = SN_old + 1 时,broker 才会接受它,如果小于,说明出现了重复写入, broker 可以直接将其丢弃。如果 SN_new > SN_old + 1 ,说明中间有数据尚未写入,出现了乱序,有可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException。
注意事项:
幂等只是针对每一对 <PID, 分区> 而言,即只能保证单个生产者会话(session)中单分区的幂等。
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "msg");
producer.send(record);
producer.send(record);
上面的示例中发送了两条内容相同的消息,但是对于 Kafka而言,这两条消息是不同的,会为这两条消息分配不同的序号。即不会保证消息内容的幂等。
事务
幂等性不能跨分区,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。
对流式应用(Stream Proccessing Applications)而言,一个典型的模式为“consum-teansform-produce”。在这种模式下,生产者和消费者共存:程序从主题中消费消息,经过转换后重新包装消息发布到另一个主题。其中消费和生产消息都有可能出现问题。Kafka 中的事务可以使应用程序将消息生产、消息消费、消费位移提交当做原子操作处理,即使跨分区。
开启事务的方式,是通过显示的设置客户端参数 teansational.id (必须唯一),并且开启幂等特性,即将参数 enable.inempotence 设置为 true,否则会报出 ConfigException。
transactionId 和 PID 一一对应,唯一不同的是 transactionId 由用户显示设置,而 PID 是 Kafka内部分配的。另外,每个生产者通过 transactionId 获取 PID 的同时,还会获取一个单调递增的 producer epoch。如果使用同一个 transactionId 开启两个生产者,那么前一个开启的生产者将会报错。
从生产者的角度看,事务可以实现:
- 跨生产者会话的消息幂等发送:具有相同 transactionId 的新生产者实例被创建且工作时,旧的生产者将不在工作。
- 跨生产者的会话可以从故障中恢复:当某个生产者宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit)要么被中止(Abort)。
从消费者的角度看,事务能保证的语义偏弱,由于以下原因,Kafka无法保证已提交的事务中的所有消息都能被消费:
- 对采用日志压缩策略的主题而言,事务中的某些消息可能被清理(相同 key 的消息,先写入的消息会被覆盖)
- 事务中的消息可能分布在同一个分区的多个分段(LogSegment)中,当老的分段被删除时,对应的消息会丢失
- 消费者可以通过 seek() 方法访问任意 offset 的消息,从而遗漏事务中的消息
- 消费者在消费时可能没有被分配到事务内的所有分区,如此也就不能读取事务中的所有消息
注意:消费者端参数 isolation.level 默认值 “read_uncommitted”,即消费端可以消费到未提交的事务,与事务有莫大的关联,此参数另一个选项为 “read_committed”,表示消费端消费不到未提交的事务内的消息。
实现原理
事务协调器(TransactionCoordinator),每个生产者都会被指派一个特定的 TransactionCoordinator,所有的事务逻辑包括分配 PID 等都是由 TransactionCoordinator 来负责。TransactionCoordinator 会将事务状态持久化到内部主题 _teansaction_state 中。
消息传输保障
-
多副本:建议副本数为 3 即可满足大部分场景的可靠性要求,而对可靠性要求高的场景可以增大副本数,比如国内部分银行在使用 Kafka 时会设置副本数为 5 。不过副本过多也会引起磁盘、网络带宽的消耗,以及整体性能下降。
- 生产者确认:生产者客户端参数 acks = -1 (all)配合 min.insync.replicas (默认 1)可以最大程度提高消息的可靠性。典型配置为:副本数配置为 3,min.insync.replicas 配置为 2 。注意 min.insync.replicas 参数在提高可靠性的同时可能降低可用性,比如当 ISR 中只有一个副本,那么起码还是可用的,但是如果设置了 min.insync.replicas > 1,则会使消息写入失败。
- 生产者重试:设置生产者端参数 retries (默认 0,但在 KafkaAdminClient 中默认为 5,可以作为参考) 以及 retry.backoff.ms (重试间隔)。建议在使用前,先评估可能的异常恢复时间来辅助设置重试间隔。注意:如果 retries 的值大于 0,可能带来一些负面影响,比如 max.in.flight.requests.per.connection 默认为 5,这样可能会影响消息顺序性,对此要么放弃使用重试功能,要么将 max.in.flight.requests.per.connection 设置为 1,但这样也会降低吞吐量。再比如对于延迟要求较高的场景,很多时候快速失败是更好的选择,如果 retries > 0 会增加客户端对异常的反馈时延。
- 禁用从非 ISR 集合选举新的 leader:参数 unclean.leader.election.enable (0.11.0.0 版本开始默认为 false)
- 消费者手动提交:消费者端参数 enbale.auto.commit(默认 true) 参数设置为 false。
- 兜底:回溯消息,通过消息回溯我们有机会对漏掉的消息进行回补,进一步提高可靠性。