文档章节

Kafka配置参数详解

China_OS
 China_OS
发布于 2015/05/12 10:51
字数 2914
阅读 580
收藏 3

最近由于业务需要,生产部署了两套kafka集群,下面记录一下相关配置参数的意思:

Topic-level configurations
Property Default Description
broker.id 每个broker有一个唯一的整数ID,这个id就是broker的name,允许broker迁移到不同的host,只要id不变就不会影响consumers
log.dirs /tmp/kafka-logs 一个逗号分隔的目录列表,用来存储kafka的数据,每当创建一个partition的时候,都会被放进一个目录。
port 6667 服务监听的端口,用来接受客户端连接 
zookeeper.connect null 指定ZooKeeper的连接串,格式是hostname:port,如果是zookeeper集群,则多个zookeeper节点用逗号分隔开。
message.max.bytes 1000000 server可以接受的消息体的最大大小
num.network.threads 3 server用来接收network请求的线程数,基本上不需要更改。
num.io.threads 8 server用来执行请求的I/O线程数,该线程数应该和你的硬盘数一样多
background.threads 4 执行后台任务的线程数,你应该改变这个
queued.max.requests 500 在I/O线程队列中的请求达到多少多少个之后,network线程停止接收新的请求
host.name null

broker的主机名,如果被设置,则只会绑定到该地址,如果没有设置,则绑定到所有接口上。

advertised.host.name null

advertised的地址,如果设置,则会提供给produces、consumer及其他broker来连接。

advertised.port null

给producers, consumers, and other brokers来连接的端口,必须和server绑定的端口不同。 

socket.send.buffer.bytes 100 * 1024 socket的发送缓冲区,SO_SNDBUFF
socket.receive.buffer.bytes 100 * 1024 socket的接收缓冲区,SO_RCVBUFF 
socket.request.max.bytes 100 * 1024 * 1024 server允许的socket请求最大数目,防止server出现out of memory,该值应该小于Java heap size.
num.partitions 1 topic的默认partitions数目
log.segment.bytes 1024 * 1024 * 1024 topic的partition存储是以目录中的segment files存在的,该值控制segment file的最大大小,如果文件达到该值后,会重新生成一个新的日志文件。
log.roll.hours 24 * 7 强制Kafka在达到该时间后生成一个新的log文件,而不管log.segment.bytes是否达到指定的大小
log.cleanup.policy delete 日志清理,值可以是:delete or compactdelete:log segments文件达到指定大小或者时间限制后会被删除。compact:会设置 log compaction,清除废弃的记录。
log.retention.{minutes,hours} 7 days 数据存储的最大超时时间,默认的数据retention是对所有的topics生效的。如果log.retention.minutes和log.retention.bytes都被设置为delete,只要任意一个达到要求都会被执行。
log.retention.bytes -1 每个topic-partitions保持的数据大小,这是每个partition的大小,因此根据所有的partition数目可以得到topic保存的数据量大小,如果retention.hours和log.retention.bytes都被设置为delete,只要任意一个达到要求都会被执行,-1表示没有限制。
log.retention.check.interval.ms 5 minutes segment log的检查周期,以确定是否达到执行删除的策略 
log.cleaner.enable false 是否开启日志压缩
log.cleaner.threads 1 设置日志压缩中清除日志的线程数
log.cleaner.io.max.bytes.per.second None 日志压缩时候处理的最大大小
log.cleaner.dedupe.buffer.size 500*1024*1024 日志清除时为索引和去重设置的buffer大小,越大越好
log.cleaner.io.buffer.size 512*1024 日志清理的时候I/O块大小,一般不需要修改
log.cleaner.io.buffer.load.factor 0.9 日志清理中的hash表的扩大因子
log.cleaner.backoff.ms 15000 检查日志是否需要清理的时间间隔
log.cleaner.min.cleanable.ratio 0.5 控制日志清理的频率,.值越大意味着更高效的清理,同时会浪费一些空间By d be duplicates)
log.cleaner.delete.retention.ms 1 day 压缩的日志保留的最长时间,也是consumer消费消息的最长时间。
log.index.size.max.bytes 10 * 1024 * 1024 segment log索引的大小. 如果index增长到指定大小,即使没有达到log.segment.bytes的限制,也是会产生一个新的日志。
log.index.interval.bytes 4096 当执行一个fetch操作后,server需要做一次线性扫描以确认正确的position大小,所以该值越大,扫描的次数越少,一般情况下不需要改变。
log.flush.interval.messages None log文件sync到磁盘之前累积的消息条数,如果设置较小,会导致sync次数过多,对性能有一点影响。但是物理故障,将不会导致数据丢失。如果值较大,则每次fsync的间隔较长。
log.flush.scheduler.interval.ms 3000 log flusher检查log是否要flushed到磁盘的时间间隔 
log.flush.interval.ms None 两次fsync之间的最大时间间隔。如果和log.flush.interval.messages一起使用,只要任意一个达到要求,都会被执行。 
log.delete.delay.ms 60000 文件在内存中的segment index删除后,保留的时间。一般不需要修改。
log.flush.offset.checkpoint.interval.ms 60000 控制上次flush硬盘的时间点,以便数据恢复,一般不需要改动 
auto.create.topics.enable true 是否允许自动创建topic,当设置为true,试图produce, consume, or fetch metadata在一个不存在的topic的时候,会自动创建
controller.socket.timeout.ms 30000 partition管理控制点到replicas的超时时间.
controller.message.queue.size 10 controller-to-broker-channels的buffer大小
default.replication.factor 1 自动创建的topic的复制数目
replica.lag.time.max.ms 10000 replicas响应partition leader的最长等待时间,若是超过这个时间,则认为他是死的。
replica.lag.max.messages 4000 如果folower落后leader太多,将会认为此folloer已经失效
replica.socket.timeout.ms 30 * 1000 follower和leader之间的socket超时时间
replica.socket.receive.buffer.bytes 64 * 1024 leader复制的时候的socket缓存大小
replica.fetch.max.bytes 1024 * 1024 requests每次获取的数据量大小
replica.fetch.wait.max.ms 500 replicas和leader之间通讯的最大等待时间.
replica.fetch.min.bytes 1 每次fetch的最小数据尺寸,如果leader中尚未同步的额数据不足该值,则会一直阻塞到条件满足
num.replica.fetchers 1

leader进行复制的线程数

replica.high.watermark.checkpoint.interval.ms 5000 每个replica检查是否将最高水位进行固话的频率
fetch.purgatory.purge.interval.requests 10000 清除数据请求的历史记录的时间间隔
producer.purgatory.purge.interval.requests 10000 清除数据产生的历史记录的时间间隔
zookeeper.session.timeout.ms 6000 zk的最大超时时间,也就是心跳间隔,若是超时,则认为该zk失效
zookeeper.connection.timeout.ms 6000 客户端连接zk的超时时间
zookeeper.sync.time.ms 2000 zk的leader和follower的同步时间
controlled.shutdown.enable false 是否允许控制器关闭broker,若是true,则会关闭这个broker上的所有leader,并转移到其他broker
controlled.shutdown.max.retries 3 控制器关闭broker的尝试次数
controlled.shutdown.retry.backoff.ms 5000 每次关闭尝试的时间间隔
auto.leader.rebalance.enable false 如果设置为true,则控制器会自动进行leadership 平衡
leader.imbalance.per.broker.percentage 10 leader的不平衡比例,若是超过这个数值,则会对分区进行平衡
leader.imbalance.check.interval.seconds 300 检查leader blance的时间间隔
offset.metadata.max.bytes 1024 允许客户端保存他们的offset信息的的最大metadata大小

    补充:delete.topic.enable  设置topic删除策略


consumer

Property Default Description
group.id 一个唯一的字符串,定义了consumer属于哪个group,定义相同的group id,则表示consumer属于同一个group。
zookeeper.connect 定义zookeeper集群的连接字符串,多台zookeeper用逗号隔开,hostname1:port1,hostname2:port2,hostname3:port3.
consumer.id null

consumer id,如果没有设置,则自动生成

socket.timeout.ms 30 * 1000 socket请求的网络超时时间,实际的超时时间是max.fetch.wait + socket.timeout.ms.
socket.receive.buffer.bytes 64 * 1024 网络请求中socket的接收缓冲区大小 
fetch.message.max.bytes 1024 * 1024 每次从topic-partition中获取消息的最大值,数据会被读入内存中。
auto.commit.enable true 如果设置为true,则会周期性的提交已经消费消息的offset到zookeeper,一般用到当consumer失败后,新的consumer从哪个位置开始消费消息。
auto.commit.interval.ms 60 * 1000 comsumer的offset提交到zookeeper的频率
queued.max.message.chunks 10 消费消息的最大message chunks buffered, 每个chunk的大小可以和fetch.message.max.bytes相同
rebalance.max.retries 4 当有一个新的consumer加入consumer group的时候,会rebalance group中的每个consumer,如果consumer有改变,则会尝试进行rebalance,如果失败,则进行重新尝试的次数。
fetch.min.bytes 1 server每次发送到consumer的最小数据大小,如果数据不满足这个值,则会进行等待。
fetch.wait.max.ms 100 如果没有消息满足fetch.min.bytes指定的大小,则服务端在回复fetch请求前的阻塞时间。 
rebalance.backoff.ms 2000 每次rebalance的时间间隔
refresh.leader.backoff.ms 200 每次查看是否需要重新选举partition leader的时间间隔 
auto.offset.reset largest

当zookeeper中没有初始offset值的时候,或者offset超出范围时候的处理方式:
* smallest : 自动重置为最小值
* largest : 自动充值为最大值
* anything else: 给consumer抛出一个异常

consumer.timeout.ms -1 在指定的时间周期内如果没有消息到达,则抛出一个timeout exception
client.id group id value client id是用户定义的一个字符串,用来跟踪每个请求的调用
zookeeper.session.timeout.ms  6000 ZooKeeper session的超时时间,如果consumer在这个时间内没有给ziikeeprt报告心跳检测,则认为consumer已经dead,并开始进行rebalance 
zookeeper.connection.timeout.ms 6000 客户端连接zookeeper的等待超时时间 
zookeeper.sync.time.ms  2000 ZK的follower和ZK leader之间的同步时间

producer
Property Default Description
metadata.broker.list

producer获取topics, partitions and replicas的metadata,地址格式是host1:port1,host2:port2

request.required.acks 0

这些状态值反映了produce的request是否完成。特别是,有多少broker已经提交数据到日志并且已经反馈给leader。

  • 0,不保证消息到达,只负责发送,延迟低,但是会出现消息丢失。
  • 1,发送消息,leader收到数据进行复制后发送确认信息。具有一定的可靠性。
  • -1,发送消息,leader收到的所有数据复制完毕后发送确认消息,提供了最好的可靠性。可以确保没有消息丢失。
request.timeout.ms 10000 消息发送的最长等待时间
producer.type sync

produce发送消息的类型,async异步发送消息,sync同步发送消息。

serializer.class kafka.serializer.DefaultEncoder messages的序列化方式。 The default encoder takes a byte[] and returns the same byte[].
key.serializer.class keys的序列化方式,如果没有设置,则同serializer.class
partitioner.class kafka.producer.DefaultPartitioner 分隔在sub-topics中的消息。默认的分隔是基于key的hash
compression.codec none

允许你压缩producer产生的信息,值可以是"none", "gzip" and "snappy".

compressed.topics null

是否设置针对特定topic进行压缩

message.send.max.retries 3

消息发送失败后尝试的次数

retry.backoff.ms 100

producer刷新metadata或者相关topic查看是否有新的leader选举产生

topic.metadata.refresh.interval.ms 600 * 1000

当partition丢失或者leader不可达时,producer刷新topic metadata的时间间隔。它会定期的去刷新(default: every 10min so 600000ms),如果设置为负值,则只有在失败的时候才会刷新metadata,如果设置为0,则会在每条消息发送后刷新metadata,如果producer不发送消息,则永远不会刷新metadata

queue.buffering.max.ms 5000 在async模式下,buffer data的最大时间。如果设置为100,则会一次批量发送100ms内buffer的所有数据。增大该值,会增加吞吐量,但是会增加数据延迟。
queue.buffering.max.messages 10000 在async模式下,可以buffer在queued中的最大未发送消息数量,如果超出这个数量,则会阻塞或者丢弃。
queue.enqueue.timeout.ms -1

在async模式下,消息进入队列在丢弃消息前的等待时间,如果设置为0,则会立即进入队列,如果队列满了,则会丢弃掉。如果设置为-1,则producer进行无限期的阻塞,并不会drop掉。 

batch.num.messages 200 当使用async模式的时候,一次批处理发送消息的数量,producer会等待要发送的消息达到这个数目,或者queue.buffer.max.ms达到指定值才会进行发送。
send.buffer.bytes 100 * 1024 Socket的写缓冲大小
client.id "" 客户指定的client id,用来跟踪消息记录 










© 著作权归作者所有

上一篇: Kafka介绍
下一篇: Flume-ng生产配置
China_OS
粉丝 422
博文 460
码字总数 513987
作品 0
静安
技术主管
私信 提问
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
453
1
深入掌握大数据Kafka的使用(基于Python开发)-张明阳-专题视频课程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/a2011480169/article/details/83583785 深入掌握大数据Kafka的使用(基于Python开发)—3人已学习 课程介绍 ...

安静的技术控
2018/10/29
0
0
kafka_2.11-0.11.0.1集群搭建

本文搭建一个三节点的Kafka集群。该集群有三个Zookeeper、三个Kafka Broker。 一、准备工作 解压kafka安装包,获得如下目录: Kafka目录结构: 展示如下内容: 二、搭建Zookeeper集群 将目录...

周立_itmuch
2017/11/15
0
0
logstash通过kafka传输nginx日志

logstash通过kafka传输nginx日志(三)   单个进程 logstash 可以实现对数据的读取、解析和输出处理。但是在生产环境中,从每台应用服务器运行 logstash 进程并将数据直接发送到 Elastics...

yixinsiyu
2018/08/27
0
0
快学Spark 2.0(新特性、含真实项目、纯Scala语言开发、CDH5.7)

快学Spark 2.0(新特性、含真实项目、纯Scala语言开发、CDH5.7) 分享下载地址——https://pan.baidu.com/s/1jHVvIAi 密码: sirk 本课程从基础开始,紧抓重点、言简意赅、深入浅出的讲解Spark...

羞答答的
2017/10/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

使用kubeadm 搭建K8s集群

1. 参考官网 https://kubernetes.io/docs/setup/independent/install-kubeadm/

whhbb
39分钟前
2
0
Dubbo 3.0 !提升不止一点点!

Dubbo 自 2011 年 10 月 27 日开源后,已被许多非阿里系的公司使用,其中既有当当网、网易考拉等互联网公司,也不乏中国人寿、青岛海尔等大型传统企业。 自去年 12 月开始,Dubbo 3.0 便已正...

编程SHA
40分钟前
2
0
提升不止一点点,Dubbo 3.0 预览版详细解读

Dubbo 自 2011 年 10 月 27 日开源后,已被许多非阿里系的公司使用,其中既有当当网、网易考拉等互联网公司,也不乏中国人寿、青岛海尔等大型传统企业。更多用户信息,可以访问Dubbo @GitHub...

阿里云云栖社区
43分钟前
32
0
HanLP Analysis for Elasticsearch

基于 HanLP 的 Elasticsearch 中文分词插件,核心功能: 兼容 ES 5.x-7.x; 内置词典,无需额外配置即可使用; 支持用户自定义词典; 支持远程词典热更新(待开发); 内置多种分词模式,适合...

左手的倒影
今天
1
0
spark部署之yarn模式

spark部署之yarn模式 hadoop-3.0.0集群搭建 配置相应环境 java环境 scala(可配可不配) hadoop环境 从官网下载spark 解压 配置 /conf/spark-env.sh export JAVA_HOME=/usr/java/jdk1.8.0_4...

jackmanwu
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部