文档章节

Kafka配置及常用命令笔记

liddblog
 liddblog
发布于 02/22 11:10
字数 1998
阅读 107
收藏 1

一、kafka配置

1. 服务端 server.properties

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=host1:2181,host2:2181,host3:2181

2.生产者常用配置

#kafka-producer配置,官网=>https://kafka.apache.org/documentation/#producerconfigs
#集群地址
spring.kafka.producer.bootstrap-servers=192.168.199.128:9092,192.168.199.128:9093,192.168.199.128:9094
#指定创建信息nio-buffer缓冲区大小约1M
spring.kafka.producer.buffer-memory=1024000
#累计约1M条就发发送,必须小于缓冲区大小,否则报错无法分配内存(减少IO次数,过大则延时高,瞬间IO大)
spring.kafka.producer.batch-size=1024000
#默认0ms立即发送,不修改则上两条规则相当于无效(这个属性时个map列表,producer的其它配置也配置在这里,详细↑官网,这些配置会注入给KafkaProperties这个配置bean中,供#spring自动配置kafkaTemplate这个对象时使用)
spring.kafka.producer.properties.linger.ms=1000
#kafkaTempalte可以发送对象类型的消息,系列化为json,一般使用默认的string序列化器(对象则手动手动转为json)
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
#发送确认机制:acks=all或-1:leader会等待所有ISR中的follower同步完成的ack才commit(保证ISR副本都有数据leader才commit,吞吐率降低),acks=0:partition leader不会等待任何ISR中副本的commit(可能会有数据丢失,吞吐高),acks=1 kafka会把这条消息写到本地日志文件中
spring.kafka.producer.acks=1
#发送失败重试次数
spring.kafka.producer.retries=3

3.消费者常用配置

#kafka-consumer配置,官网=>https://kafka.apache.org/documentation/#producerconfigs
spring.kafka.consumer.bootstrap-servers=192.168.199.128:9092,192.168.199.128:9093,192.168.199.128:9094
#消费组ID
spring.kafka.consumer.group-id=test-group
#当未初始化消费组偏移量或没找到是怎么办?自动偏移量重置到最早的earlist(这个值会导致新加入的消费者去消费较久以前最开始的大量信息),最新的latest(新消费者消费最新消息),还是none或其它值报错
spring.kafka.consumer.auto-offset-reset=latest
#一次最大拉取多少条消息,太多处理消息压力大,太少则IO过于频繁;这个配置需要额外培养一个批量工厂bean,并在@KafkaListener注解指定这个批量工厂{@Link https://www.jianshu.com/p/5370fff55cff}
spring.kafka.consumer.max-poll-records=10
#自动提交consumer已消费的消息offset周期,周期过大,重启后可能重复消费较多已消费但offset未提交的消息,kafka scala程序有个定时任务来提交offset
spring.kafka.consumer.auto-commit-interval=1000ms
#仅在不开启上述周期性自动确认,配置其他的ack-mode才有效,如下累计10次消费才进行一次commit以修改消息消费者在该partition的offset
spring.kafka.consumer.enable-auto-commit=false
#计数模式,还是计时模式,手动ack模式等,配合ack-count一起配置
spring.kafka.listener.ack-mode=count_time
spring.kafka.listener.ack-count=10
#消费者的其它属性,类似producer,也是一个map
#spring.kafka.consumer.properties.XXX=YYY

二、常用命令

  1. 启动(以守护者线程)

    bin/kafka-server-start.sh  -daemon config/server.properties
  2. 关闭

    bin/kafka-server-stop.sh stop
  3. 集群启动关闭脚本(zookeeper集群启动脚本类似)

    #!/bin/bash
    case $1 in
        "start"){
            for i in host1 host2 host3
            do
                echo "**************$i**************"
                ssh $i "source /etc/profile;/soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties"
            done
        };;
    
        "stop"){
            for i in host1 host2 host3
            do
                echo "**************$i**************"
                ssh $i "source /etc/profile;/soft/kafka/bin/kafka-server-stop.sh /soft/kafka/config/server.properties"
            done
        };;
    esac

    注意:此脚本保存后也要进行授权 chmod 777 文件全名

     

  4. 查看当前kafka中所有topic

    bin/kafka-topics.sh --list --zookeeper host1:2181
  5. 创建 topic

    bin/kafka-topics.sh --create --zookeeper host1:2181 --topic topicname --partitions 2 --replication-factor 3  

    选项说明 --topic 定义 topic 名 --replication-factor 定义副本数 --partitions 定义分区数

     

  6. topic详情描述

    /bin/kafka-topics.sh --describe --topic topicname --zookeeper host1:2181

     

  7. 删除 topic

    bin/kafka-topics.sh --delete --zookeeper host1:2181 --topic topicname

    说明:需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。

  8. 发送消息

    bin/kafka-console-producer.sh --broker-list hsot1:9092 --topic first
  9. 消费消息

    bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first

    注意:旧版本由于kafka的某些元数据放于zookeeper,所以用zookeeper连接,0.11版本之后可以直接用bootstrap-server连接

     

    bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --topic first
    bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --from-beginning --topic first

    说明:--from-beginning:会把主题中以往所有的数据都读取出来。

     

  10. 查看某个 Topic 的详情

    bin/kafka-topics.sh  --zookeeper host1:2181 --describe --topic first
  11. 修改分区数

    bin/kafka-topics.sh  --zookeeper host1:2181 --alter --topic first --partitions 6

三、集群脚本

1.集群分发脚本

说明:在一个集群中,若需要修改某中间件如kafka配置,则按照常规操作需要分别连接三台服务器各自修改成内容一样的,若使用该脚本,则只需要在一台机器上修改完后,使用脚本命令分发到其他机器上即可,脚本里内容请自行修改ip地址或者主机名。

使用方法:xsync /***/filename

#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环
for((host=103; host<105; host++)); do
        echo ------------------- hadoop$host --------------
        rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done

说明:在 /usr/bin 这个目录下存放的脚本,用户可以在系统任何地方直接执行。

/usr/bin目录下创建xsync文件,文件内容为以上脚本,或者在在其他目录也可以,需要将此目录配置到环境变量。 以上脚本若自己使用,需要修改hodoop为自己电脑的hostname,查看自己电脑hostname命令:hostname;修改hostname命令:hostnamectl set-hostname 名字

注意:要修改权限才能执行 chmod 777 xsync

2.集群批量处理脚本

说明:在集群中,我们需要查看某个组件的状态信息,如zookeeper的状态,常规操作是在每台机器上输入命令./zkServer.sh status,若使用该脚本则可以在一台机器上使用一次命令查看该集群所有的信息, 该脚本用于在所有主机上同时执行相同的命令。

使用方法: 进入/usr/bin目录下,输入vim xcall,向里面添加下面脚本,放到此目录的原因是在此目录下,任何地方都可以直接使用该命令,原理同分发脚本。如:xcall.sh jps 可以查看集群中kafka和zookeeper的启动情况。

#!/bin/bash
 params=$@
 i=1
 
 for((i=1 ;i <=2 ;i=$i+1 ));do
 echo ==========node$i $params==========
 ssh node$i "source /etc/profile;$params"
 done

说明node 这里对应自己主机名,i 作为节点的编号,需要做相应修改。另外,for循环中的 i 的边界值由自己的主机编号决定。 比如,集群主机分别为 hadoop101hadoop102hadoop103,则 node 对应 hadoopi 对应 101,102,103

四、Kafka监控(Kafka Eagle)

修改 kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

修改为:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m
-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -
XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在启动 Kafka 之前要分发之其他节点。

修改配置文件:

######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&ch
aracterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=000000

启动

bin/ke.sh start

注意:启动之前需要先启动 Zookeeper 以及 Kafka

登录

http://hostip:8048/ke
 

© 著作权归作者所有

liddblog
粉丝 4
博文 74
码字总数 70480
作品 0
海淀
程序员
私信 提问
加载中

评论(0)

解决默认的__consumer_offsets这个topic默认副本数为1而存在的单点故障问题

解决kafka集群由于默认的consumer_offsets这个topic的默认的副本数为1而存在的单点故障问题 抛出问题: consumer_offsets这个topic是由kafka自动创建的,默认50个,但是都存在一台kafka服务器...

Mr_zebra
2018/06/15
61
0
flume,kafka区别、协同与详解

简介 socket模式 简单数据处理 开发公司 Flume 日志采集系统 (管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.) 可编写Interceptor,对数据进行拦截,对密码进行MD5加密...

flash胜龙
2019/06/06
2.1K
0
大数据高可用集群环境安装与配置(10)——安装Kafka高可用集群

1. 获取安装包下载链接 访问https://kafka.apache.org/downloads 找到kafka对应版本 需要与服务器安装的scala版本一致(运行spark-shell可以看到当前安装的scala版本) 2. 执行命令下载并安装...

AllEmpty
2019/12/28
0
0
Linux系统Kafka集群搭建与简单测试

Kafka安装 Zookeeper集群搭建,可参考Linux系统Zookeeper集群配置 上传kafka安装包并解压 编辑配置文件 这个是server.properties文件内容 增加集群的配置文件server1.properties 修改配置 增...

Listen_ing
2016/06/02
895
0
解决kafka集群由于默认的__consumer_offsets这个topic的默认的副本数为1而存在的单点故障问题

解决kafka集群由于默认的consumeroffsets这个topic的默认的副本数为1而存在的单点故障问题 抛出问题: consumer_offsets这个topic是由kafka自动创建的,默认50个,但是都存在一台kafka服务器...

五维空间s
2018/06/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

大刘海终于收窄?iPhone 12系列设计细节曝光:三摄+雷达更占空间

苹果第一次给屏幕加上刘海,是在2017年的iPhone X。此后,尽管安卓阵营已经先后发展出了水滴屏、升降屏、打孔屏等形态各异的更高屏占比的设计方案,苹果一直将宽大的刘海保留在其后iPhone XS...

osc_p0v6j6lt
23分钟前
24
0
爆单也没能救它 瑞幸咖啡股价一度下跌20%再创新低

瑞幸咖啡(NASDAQ:LK)股价周一继续下挫,跌破上周四曝光营收造假的低点4.90美元,一度报4.27美元,下跌超20%,市值不足11亿美元。这一价格也创造了瑞幸上市后的历史最低。 据报道,高盛称,瑞...

osc_6kj0kt57
24分钟前
16
0
疫情蔓延放缓 早盘美股道指大涨逾千点

北京时间6日晚,美股周一早盘继续上扬,道指大涨逾千点。投资者对全球疫情蔓延局势的判断有所改善,风险情绪随之受到提振。美国总统特朗普称迹象显示美国疫情已开始趋于稳定,并表示他仍在考...

osc_jo2m8l1r
26分钟前
24
0
4.3万元!赛博朋克2077定制版NIVIDIA显卡成功卖出

虽然《赛博朋克2077》游戏跳票了,但是2月份NVIDIA依然推出了一款赛博朋克限量版显卡——GeForce RTX 2080 Ti“ Cyberpunk 2077 Edition”,全球限量200块,官方售价1100美元。 访问购买页面...

osc_9mctux05
27分钟前
26
0
高盛:瑞幸咖啡股东发生违约 7635万股ADS强制出售

据国外媒体报道,高盛今日发布报告称,在瑞幸咖啡股东Haode Investment公司(借款人)发生违约之后,根据一项5.18亿美元的保证金贷款安排,贷款人组成的银团已指示作为担保受托人的瑞士信贷新...

osc_t6qz550e
28分钟前
19
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部