文档章节

kafka学习笔记:知识点整理(二)

 成长tree
发布于 2017/05/23 10:55
字数 2898
阅读 7
收藏 0
点赞 0
评论 0

三、kafka HA

3.1 replication

如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

Kafka 分配 Replica 的算法如下:

1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序 2. 将第 i 个 partition 分配到第(i mod n)个 broker 上 3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

3.2 leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。

当所有 replica 都不工作时,有两种可行的方案:

1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。 2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

kafka 0.8.* 使用第二种方式。

kafka 通过 Controller 来选举 leader,流程请参考5.3节。

3.3 broker failover

kafka broker failover 序列图如下所示:

图.3

流程说明: 

 

1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch 2. controller 从 /brokers/ids 节点读取可用broker 3. controller决定set_p,该集合包含宕机 broker 上的所有 partition 4. 对 set_p 中的每一个 partition  4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR  4.2 决定新 leader(如4.3节所描述)  4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点 5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

 

3.4 controller failover

 当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:

 

1. 读取并增加 Controller Epoch。 2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。 3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。 4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。 5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。 6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。 7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。 8. 启动 replicaStateMachine 和 partitionStateMachine。 9. 将 brokerState 状态设置为 RunningAsController。 10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。 11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。 12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

 

 

四. consumer 消费消息

4.1 consumer API

kafka 提供了两套 consumer API:

1. The high-level Consumer API 2. The SimpleConsumer API

 其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。

4.1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

1. 如果消费线程大于 patition 数量,则有些线程将收不到消息 2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息 3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

4.1.2 The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

1. 多次读取一个消息 2. 只消费一个 patition 中的部分消息 3. 使用事务来保证一个消息仅被消费一次

 但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息 2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁 3. 需要处理 leader 的变更

 使用 SimpleConsumer API 的一般流程如下:

 

1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader 2. 找出每个 partition 的 follower 3. 定义好请求,该请求应该能描述应用程序需要哪些数据 4. fetch 数据 5. 识别 leader 的变化,并对之作出必要的响应

以下针对 high-level Consumer API 进行说明。

4.2 consumer group

如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

 

图.4

4.3 消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

4.4 consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

 

1.读完消息先 commit 再处理消息。 这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once 2.读完消息先处理再 commit。 这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。 3.如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。 精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)

 

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once(见文章《kafka consumer防止数据丢失》)。而 Exactly once 要求与外部存储系统协作,幸运的是 kafka 提供的 offset 可以非常直接非常容易得使用这种方式。

更多关于 kafka 传输语义的信息请参考《Message Delivery Semantics》。

4.5 consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

 

1. 将目标 topic 下的所有 partirtion 排序,存于PT 2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci 3. N=size(PT)/size(CG),向上取整 4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始) 5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci

 

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

 

1.Herd effect   任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance 2.Split Brain   每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。 3. 调整结果不可控   所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。

 

基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。(见文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此处不再赘述。

 

愿意了解更多的技术知识分享可关注:mingli.com

朋友需要请加球球:二零四二八四九二三七

 

五、注意事项

5.1 producer 无法发送消息的问题

最开始在本机搭建了kafka伪集群,本地 producer 客户端成功发布消息至 broker。随后在服务器上搭建了 kafka 集群,在本机连接该集群,producer 却无法发布消息到 broker(奇怪也没有抛错)。最开始怀疑是 iptables 没开放,于是开放端口,结果还不行(又开始是代码问题、版本问题等等,倒腾了很久)。最后没办法,一项一项查看 server.properties 配置,发现以下两个配置:

 

# The address the socket server listens on. It will get the value returned from  # java.net.InetAddress.getCanonicalHostName() if not configured. #   FORMAT: #     listeners = security_protocol://host_name:port #   EXAMPLE: #     listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092

 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 # it uses the value for "listeners" if configured. Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092

© 著作权归作者所有

共有 人打赏支持
粉丝 1
博文 16
码字总数 24644
作品 0
惠州
kafka学习笔记:知识点整理(一)

一、kafka 架构 1.1 拓扑结构 如下图: 图.1 1.2 相关概念 如图.1中,kafka 相关名词解释如下: 1.producer:  消息生产者,发布消息到 kafka 集群的终端或服务。2.broker:  kafka 集群...

愉快的鱼儿
2017/06/05
0
0
kafka学习笔记:知识点整理(二)

三、kafka HA 3.1 replication 如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 brok...

愉快的鱼儿
2017/06/05
0
0
更换地址中................

闲谈 前一段在熟悉业务,一直没有更新博客,虽然一直有在更新云笔记,逐渐发现云笔记真的很好用,越来越多的知识点存在了笔记中,但是逐渐越发担忧起来..... 苦恼问题:备份和同步 初期我只在...

奋斗的阿Q
2017/06/17
0
0
Kafka笔记整理(三):消费形式验证与性能测试

[TOC] Kafka笔记整理(三):消费形式验证与性能测试 Kafka消费形式验证 前面的《Kafka笔记整理(一)》中有提到消费者的消费形式,说明如下: 下面就来验证Kafka的消费形式,不过需要说明的...

xpleaf
03/25
0
0
一点感悟:《Node.js学习笔记》star数突破1000+

写作背景 笔者前年开始撰写的《Node.js学习笔记》 github star 数突破了1000,算是个里程碑吧。 从第一次提交(2016.11.03)到现在,1年半过去了。突然有些感慨,想要写点东西,谈谈这期间的...

程序猿小卡_casper
06/06
0
0
Java后端工程师学习大纲

之前自己总结过的Java后端工程师技能树,其涵盖的技术点比较全面,并非一朝一夕能够全部覆盖到的。对于一些还没有入门或者刚刚入门的Java后端工程师,如果一下子需要学习如此多的知识,想必很...

JackFace
2016/07/08
567
0
程序员最佳学习方法(干货总结)

前言 这里筑梦师,是一名正在努力学习的iOS开发工程师,目前致力于全栈方向的学习,希望可以和大家一起交流技术,共同进步,用简书记录下自己的学习历程. 环境 一个程序员在萌芽之中,唯一的天敌并...

筑梦师Winston
06/18
0
0
程序员必备,快速学习 Python 的全套14张思维导图(附高清版下载)

后台回复关键词 思维导图 可获取本文中的高清思维导图(PDF版) ML & AI∣一个有用的公众号 长按,识别二维码,加关注 获取更多精彩文章

micf435p6d221ssdld2
05/23
0
0

hjimce算法类博文目录 个人博客:http://blog.csdn.net/hjimce 个人qq:1393852684 知乎:https://www.zhihu.com/people/huang-jin-chi-28/activities 一、深度学习 深度学习(七十)darknet...

hjimce
2016/01/24
0
0
吴恩达(Andrew Ng)机器学习公开课中文笔记

课程地址 https://www.coursera.org/learn/machine-learning (吴恩达老师在 Coursera 上的机器学习公开课) 前言趣闻 去年的这个时间学完了这门非常赞的入门课程,最近由于项目需要,就复习...

scruel
01/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Centos7通过yum安装nginx

添加源地址(直接install可能不是最新版本的) sudo rpm -Uvh http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm 安装 sudo yum install -y ng......

iplusx
7分钟前
0
0
ef .core Dapper Helper

using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Data.SqlClient; using System.Threading.Tasks; using Dapper; using Dap......

Lytf
8分钟前
0
0
iOS 小笔记

1.以下代码打印什么     __block int val = 10;    void (^blk)(void) = ^{        printf("val=%d\n",val);        };       val = 2;    blk(); /...

风了个1
10分钟前
0
0
【Spring Boot 系列 Spring Boot示例程序】

入门程序步骤,创建一个Maven项目。继承Spring Boot官方提供的父工程。再引入一个Web的应用启动器。 1、选择一个合适的IDEA工具 创建一个Maven工程,并添加如下配置 <parent> <...

HansonReal
12分钟前
0
0
217. Contains Duplicate - LeetCode

Question 217. Contains Duplicate Solution 题目大意:判断数组中是否有重复元素 思路:构造一个set,不重复就加进去,重复返回true,如果数据量大的话,可以用布隆过滤器 Java实现: publ...

yysue
16分钟前
0
0
istio 处理失败 (理论)

Envoy提供了一套开箱即用的选择加入故障恢复功能,可以通过应用程序中的服务进行利用。功能包括: 超时 具有超时预算和重试之间的可变抖动的有界重试 限制并发连接数和对上游服务的请求 对负...

xiaomin0322
17分钟前
0
0
eclipse解决git冲突举例

本地修改了两个文件,提交时提示有冲突,想来应该是没有从远程仓库下载最新代码导致的。通过右击项目 -> Team -> Sychronized WorkSpace,比较本地仓库和远程仓库的异同:   此时没有更好的...

Code辉
25分钟前
0
0
运行.jar后缀的文件

前提必须安装了jdk,正确配置环境变量。 在dos窗口执行以下命令即可。 java -jar C:\Users\10492\Desktop\turn.jar

haha360
28分钟前
0
0
Java程序员如何做代码压力测试?【JWordPress前台项目实战】

代码 pom.xml文件引入包 <dependency><groupId>com.taobao.stresstester</groupId><artifactId>stresstester</artifactId><version>1.0</version></dependency> 编写测试代码 /**......

迷你芊宝宝
33分钟前
0
0
面试宝典-什么是缓存穿透?

缓存穿透是说收到了一个请求,但是该请求缓存里没有,只能去数据库里查询,然后放进缓存。 这里面有两个风险,一个是同时有好多请求访问同一个数据,然后业务系统把这些请求全发到了数据库;...

suyain
39分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部