文档章节

3-kafka0.10 生产者详解

土茯苓
 土茯苓
发布于 2016/09/12 13:55
字数 1133
阅读 66
收藏 1

Kafka在0.8.1版本的时候重写了Producer。在0.9版本中又重写了Consumer,纯Java,没有了对Scala和ZK的依赖。

一、消息的发送流程:

  • KafkaProducer:
    • 等待 topic meteData 数据的更新,序列化 key,value;
    • 根据 topic 的 partition 个数和 key 的值,计算该条消息所属的 partition,将消息 append 给 RecordAccumulator;
  • RecordAccumulator:
    • 使用Map类型的 batches ( ConcurrentMap<TopicPartition, Deque<RecordBatch>> ) 维护发向所有 topic 的消息数据;
    • append 方法内将发送的这条消息 tryAppend 进对应 Deque 最后一个 RecordBatch 中。如果空间不够,该 RecordBatch 就会 flip ByteBuffer,进入只读状态。空间不够或者失败则会在 Deque 末端尝试新起一个 RecordBatch;
  • Sender:
    • KafkaProducer 初始化的时候会启一个 KafkaThread 线程,运行 Runnable 的 Sender 对象,不停地发送 RecordAccumulator 内累积的消息;
    • 调用 RecordAccumulator 的 ready 方法收集到此次发送任务的目的地,即 Broker Leader 的列表,消息都是发送给所属 Partition 目前是 Leader 的那个 Broker 节点的;
    • 调用 NetworkClient 的 ready 方法,判断收集到的每个 Leader 节点是否是 connected 状态,否的话会被移除;
    • 调用 RecordAccumulator 的 drain 方法,获得发送给每个 Broker 节点的 RecordBatch 列表。将发往每个 Broker 节点的 RecordBatch 数据,封装成一个 ClientRequest,主要的消息内容由 RequestSend 内的 Struct 结构表示。Struct 内部已将消息按 topic 分开,并是按 kafka 消息的 schema 生成,具有如下的嵌套结构:{“acks”:1,”topic_data”:[{"topic": "xxx", "data": [{"partition": 1, "record_set": ByteBuffer}]}]}。ClientRequest 内还包括发完消息后的 CallBack 处理逻辑;
    • 遍历每个 ClientRequest,调用 NetworkClient 的 send 方法,将 RequestSend 放进 Selector.channels 内对应的 KafkaChannel 中;
    • 调用 NetworkClient 的 poll 方法,将 RequestSend 真正的发送给 Broker;
  • RecordAccumulator:
    • ready 方法中检查每个 Deque 的第一个 RecordBatch 是否是 ready 的状态,并把 RecordBatch 对应的 Broker Leader 节点收集起来好向它们发送消息。判断 RecordBatch 是否 ready 涉及到这个 bath 是否满了、距离上一次检查是否够久等。例如如果 RecordBatch 所在的 Deque 长度大于1,证明这个 RecordBatch 曾今被 append 的时候发现已经满了,现在是只读待发状态,是 ready 的。需要等待的时长受是否处在 backoff 时期,是否超过 linger 时长等影响;
    • drain 方法中遍历收集到的、 connected 状态的 Broker Leader 节点,根据每个节点下归属的 Partition 对应从 batches 中的 Deque 中取出第一个 RecordBatch,拼装成 Map<Integer, List<RecordBatch>> 的结构,key 是 Broker 节点的 id, value 是发给该节点的 RecordBatch 列表;
  • NetworkClient:
    • 使用 ClusterConnectionStates (Map<String, NodeConnectionState>) 维护着每个 Broker 节点的连接状态;
    • ready 方法中判断是否跟指定的 Broker 节点是 connected 的状态,否的话会通过 Selector 的 connect 方法初始化跟其的连接,建立 SocketChannel 并 register,KafkaChannel 会 attach 在 SelectionKey 上 ;
    • poll 方法中调用 Selector 的 poll 方法,处理 Selector 内的 completedSends,completedReceives等,处理 ClientResponse, 遍历 RecordBatch 内的List<Thunk>,完成回调逻辑的处理;
  • Selector:
    • 使用 channels (Map<String, KafkaChannel>) 维护着与每个 Broker 节点的 Channel;
    • 使用 completedSends (List<Send>)  维护着已经发送完毕的 RequestSend
    • 使用 completedReceives (List<NetworkReceive>)  维护着来自 Broker 的 response;
    • poll 方法中遍历 SelectionKey, 如果 KafkaChannel ready + SelectionKeywritable,那么就将 KafkaChannel 中的 RequestSend 发送,并维护更新 completedSends;如果 KafkaChannel ready + SelectionKey readable,那么就接受来自 Broker 的 NetworkReceive,并维护更新 completedReceives;

二、延迟与吞吐量的问题:
Case1: Producer将消息一条接一条发送到 Broker,假设发送延迟是 2ms,那么 1s 可以发送 500 条消息;

Case2: Producer将消息延迟 8ms 发送,假设 8ms 内收集到 20 条消息,那么 1s 可以发送 2000 条消息;

两个重要的参数:
batch.size:  This is an upper limit of how many messages Kafka Producer will attempt to batch before sending, specified in bytes (default is 16K bytes). Kafka may send batches before this limit is, but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.

linger.ms:  How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch.  (default is 0). Sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency.

三、总结:
Kafka 的 Producer 通过把将要发送的消息先放在 RecordAccumulator 的 batches 内累积一段时间,然后进行小批量提交给 Broker 的方式,减少网络往返的开销,牺牲一点latency 换取 throughput。

© 著作权归作者所有

共有 人打赏支持
土茯苓
粉丝 32
博文 174
码字总数 198211
作品 0
朝阳
高级程序员
私信 提问
4-kafka0.10 新消费者使用

Consumer Client 本节主要介绍Kafka从一些topic消费数据的示例。 配置 使用新版的Consumer,需要先在工程中添加kafka-clients依赖,添加的配置信息如下: 初始化与配置 Consumer的创建过程与...

李矮矮
2016/09/12
83
0
深入掌握大数据Kafka的使用(基于Python开发)-张明阳-专题视频课程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/a2011480169/article/details/83583785 深入掌握大数据Kafka的使用(基于Python开发)—3人已学习 课程介绍 ...

安静的技术控
2018/10/29
0
0
RocketMQ详解-架构模块解析

RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。 Namesrv: 存储当前集群所有Brokers信息、T...

ericquan8
2016/12/17
1K
1
activeMq安装及原理分析

1.下载地址: activeMQ:window中的apache-activemq-5.15.8-bin.zip 2.安装 解压压缩包, 运行bin中的activemq.bat,qizh,根据本地环境决定运行win64还是win32,结果: 访问:http://127.0.0....

olv123
02/26
0
0
rabbitMq超详解

在此向前辈们致敬:http://blog.csdn.net/shatty/article/details/9529463 为什么要学rabbitMQ 在此之前,我们想来进行一个概念区分 threading queue :只能用于线程之间的消息传发 进程que...

眉间雪
2017/11/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

day11

architect刘源源
今天
6
0
论学好Linux系统的超级重要性

不知道各位在日常的工作生活中有没有接触过“rm -rf /*”这个命令,因为这个命令搞出来的事情可还不少呢!前段时间就在一个群里看到了有个小伙子,老板让他去维护一下服务器,这小伙也不太懂...

Linux就该这么学
昨天
6
0
git 使用

1,首先在github配置好信息和仓库,然后在本地进行操作 git init git config user.name 'zhangwuer' git config user.email '56789053@qq.com' 2,与远程分支建立连接 git checkout -b test......

天王盖地虎626
昨天
3
0
git checkout 命令详解

在日常的git操作中,git checkout——检出,是我们的常用命令。最为常用的两种情形是创建分支和切换分支。 在下面的命令中,使用了一些简写,在这里说明一下: git st # git statusgit ci ...

shzwork
昨天
10
0
【Nginx】Nginx多级代理,获取客户端真实请求IP以及每级代理IP

Nginx多级代理,获取客户端真实请求IP以及每级代理IP 如图所示,每一级nginx里的location配置里需要加上对应的配置,最后一级nginx是直接到应用,测试时为了方便,直接用echo模块去测试,打印...

薛定谔的旺
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部