文档章节

kafka命令行工具大全

爱宝贝丶
 爱宝贝丶
发布于 01/30 22:29
字数 1533
阅读 320
收藏 1
  • 创建主题
sh kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer> --if-not-exists

示例:

sh kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic my-topic --replication-factor 2 --partitions 2
  • 增加主题数量
sh kafka-topics.sh --zookeeper <zookeeper connect> --alter --topic <string> --partitions <integer>

示例:

sh kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic my-topic --partitions 3
  • 删除主题
sh kafka-topics.sh --zookeeper <zookeeper connect> --delete --topic <string>

示例:

sh kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic my-topic
  • 展示所有的主题
sh kafka-topics.sh --zookeeper <zookeeper connect> --list

示例:

sh kafka-topics.sh --zookeeper localhost:2181/kafka --list
  • 展示所有主题的详细信息
sh kafka-topics.sh --zookeeper <zookeeper connect> --describe
# 参数
--topics-with-overrides 列出所有覆盖了默认配置的主题
--under-replicated-partitions 列出所有处于不同步状态的主题
--unavailable-partitions 列出所有没有leader的分区,也就是处于不可用状态的分区

示例:

sh kafka-topics.sh --zookeeper localhost:2181/kafka --describe
  • 生产者吞吐量测试
sh kafka-producer-perf-test.sh --topic <topic> --num-records <integer> --record-size <integer> --throughput -1 --producer-props bootstrap.servers=<kafka connect> acks=-1

示例:

sh kafka-producer-perf-test.sh --topic test-topic --num-records 500000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 acks=-1
  • 消费者吞吐量测试
sh kafka-consumer-perf-test.sh --broker-list <kafka connect> --messages <integer> --topic <topic>

示例:

sh kafka-consumer-perf-test.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --messages 500000 --topic test-topic
  • 展示所有的group
# 旧版将group信息保存在zookeeper上
sh kafka-consumer-groups.sh --zookeeper <zookeeper connect> --list
# 新版将group信息保存在broker上
sh kafka-consumer-groups.sh --bootstrap-server <kafka connect> --list

示例:

sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 获取群组的详细信息
sh kafka-consumer-groups.sh --bootstrap-server <kafka connect> --describe --group <group>

示例:

sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
  • 删除群组
sh kafka-consumer-groups.sh --bootstrap-server <kafka connect> --delete --group <group>

示例:

sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group
  • 导出偏移量
sh kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect localhost:2181/kafka --group connect-dump-kafka-config --output-file offsets.log
  • 导入偏移量
sh kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect localhost:2181/kafka --input-file offsets.log
  • 动态添加可用配置
sh kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name <topic name> --add-config <key>=<value>[,<key>=<value>...]

示例:

sh kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name customerContacts --add-config retention.ms=3600000
  • 覆盖客户端默认配置
sh kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type clients --entity-name <client ID> --add-config <key>=<value>[,<key>=<value>...]

说明:

# 这里的客户端配置现阶段只允许修改如下两个:
producer_bytes_rate 表示单个生产者每秒钟可以往单个broker上生成的消息字节数
consumer_bytes_rate 表示单个消费者每秒钟可以从单个broker上读取的消息字节数
  • 列出被覆盖的配置
sh kafka-configs.sh --zookeeper <zookeeper connect> --describe --entity-type topics --entity-name <topic name>
  • 移除被覆盖的配置
sh kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name <topic name> --delete-config <key>=<value>[,<key>=<value>...]
  • 首选首领选举
sh kafka-preferred-replica-election.sh --zookeeper <zookeeper connect>
  • 指定文件的方式进行首领选举
sh kafka-preferred-replica-election.sh --zookeeper <zookeeper connect> --path-to-json-file <partitions.json>
{
    "partitions": [
        {
            "partition": 1,
            "topic": "foo"
        },
        {
            "partition": 2,
            "topic": "foobar"
        }
    ]
}
  • 分区重新分配

    • 指定需要进行分区重新分配的topics,并且获取重分配方案
    # 执行该命令之后,kafka会返回指定topics的当前分区情况,和将要进行重分配的方案,这里在进行重分配之前
    # 尽量将现有的方案放在指定文件中进行保存,以方便重分配失败时可以回滚到现有方案
    sh kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file topics.json --broker-list 0,1,2
    
    # topics.json 指定需要进行重新分配的topics
    {
      "topics": [
        {
          "topic": "customerContacts"
        },
        {
          "topic": "customerCountries"
        }
      ],
      "version": 1
    }
    
    • 进行分区重分配
    # 这里reassignment.json中保存了kafka返回的建议进行的重分配的方案
    sh kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --execute --reassignment-json-file reassign.json
    
    • 查看分区重分配进度
    sh kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --verify --reassignment-json-file reassign.json
    
  • 解码日志片段,并且显示消息的数据内容

# 示例中没有目录相关的信息,在实际使用时需要注意目录问题
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
  • 验证分区副本的一致性
sh kafka-replica-verification.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic-white-list 'my-*'
  • 控制台消费者
# 使用控制台的方式消费消息
sh kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic my-topic

可选参数:

# 这里topic,whitelist和blacklist三个参数默认只能选择一个
--whitelist 指定白名单
--blacklist 指定黑名单

# 指定消费者的其他参数信息
--consumer.config CONFIGFILE 通过配置文件的方式指定其他参数信息
--consumer-property KEY=VALUE 通过键值对的形式传递参数,多个则以逗号隔开

# 消息格式化器除了默认的还有三种
# kafka.tools.LoggingMessageFormatter 将日志输出到日志中,日志级别为INFO
# kafka.tools.ChecksumMessageFormatter 只打印消息的校验和
# kafka.tools.NoOpMessageFromatter 读取消息但不进行任何打印
# 对于标准格式化器,其有一些非常有用的配置项,可以通过--property命令行参数传递给它
# print.timestamp 打印每个消息的时间戳
# print.key 如果被设为true,除了打印消息的值外,还会打印消息的键
# key.separator 打印消息的键和值所使用的分隔符
# line.separator 指定消息之间的分隔符
# key.deserializer 指定打印消息的键所使用的反序列化类名
# value.deserializer 指定打印消息的值所使用的反序列化类名
--formatter CLASSNAME 指定消息格式化器的类名,用于解码消息,默认为kafka.tools.DefaultFormatter

--from-beginning 指定从最旧的消息开始读取数据
--max-messages NUM 指定在退出之前最多读取NUM个消息
--partition NUM 指定只读取ID为NUM的分区
  • 读取偏移量主题
sh kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1
  • 控制台生产者
# 在控制台发送消息
sh kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-topic

可选参数:

--key-serializer CLASSNAME 指定消息键的编码器类名
--value-serializer CLASSNAME 指定消息值的编码器类名
--compression-codec STRING 指定生成消息所使用的压缩类型,可以是none,gzip,snappy或lz4,默认为gzip
--sync 指定以同步的方式生成消息
# 默认的命令行消息读取器是kafka.tools.LineMessageReader,其有一些非常有用的参数,
# 可以通过--property将这些消息传给控制台生产者:
# ignore.error 如果被设置为false,那么在parse.key被设为true或者标准输入里没有包含键的分隔符时
#     就会抛出异常,默认为true
# parse.key 如果被设为false,那么生成消息的键总是null,默认为true
# key.separator 指定消息键和消息值之间的分隔符,默认是Tab字符

© 著作权归作者所有

上一篇: ByteBuffer详解
爱宝贝丶

爱宝贝丶

粉丝 340
博文 136
码字总数 456051
作品 0
武汉
程序员
私信 提问
kafka命令行工具

kafka命令行工具 四号程序员2017-12-204 阅读 kafkacliLinux 点赞 kafkacliLinux 作者:四号程序员 Keep It Simple And Stupid 原文地址:kafka命令行工具, 感谢原作者分享。 ←如何在shell...

四号程序员
2017/12/20
0
0
mysql 命令行补全工具 mycli

mysql 命令行补全工具 mycli 前言 我们在连接mysql数据库的时候,大多数情况下是使用gui图形界面的工具的。但是,有时候连接数据库还是命令行方便,所以,我们通常都需要掌握一点命令行操作数...

FungLeo
2017/05/10
0
0
史上最全的开发和设计资源大全

【导读】:GitHub 上的 Awesome 系列(资源大全系列),是一个汇总了优秀工具资源的大集合,并由 GitHub 社区用户持续维护和更新。初始的版本都是英文,伯乐在线组织整理了热门资源大全的中文...

Yomut
2016/08/07
192
0
flume,kafka区别、协同与详解

简介 socket模式 简单数据处理 开发公司 Flume 日志采集系统 (管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.) 可编写Interceptor,对数据进行拦截,对密码进行MD5加密...

flash胜龙
06/06
339
0
kafka manger 安装

0-项目介绍 Kafka在雅虎内部被很多团队使用,媒体团队用它做实时分析流水线,可以处理高达20Gbps(压缩数据)的峰值带宽。 为了简化开发者和服务工程师维护Kafka集群的工作,构建了一个叫做Kafka...

杨春炼
2016/06/23
646
0

没有更多内容

加载失败,请刷新页面

加载更多

同名依赖,多次引入导致的程序错误

表现: 本地测试正常,打包上线后报错找不到某个方法(缺少依赖),检测依赖发现,同名依赖有两个版本。 解决:删除一个,程序正常

避难所
25分钟前
3
0
在HTML中的下拉框中实现超连接

<!DOCTYPE html><html lang="zh-CN"><head> <meta charset="UTF-8"> <link rel="canonical" href="https://blog.csdn.net/weixin_34228617/article/details/86130280"/> ......

mickelfeng
30分钟前
3
0
Content7关闭防火墙命令

在外部访问CentOS中部署应用时,需要关闭防火墙。 关闭防火墙命令:systemctl stop firewalld.service 开启防火墙:systemctl start firewalld.service 关闭开机自启动:systemctl disable f...

无名氏的程序员
31分钟前
3
0
分布式存储原理:TiDB

浮躁的码农
44分钟前
6
0
CSS实现圆角边框的完美解决方案

css实现图片圆角,兼容所有浏览器: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <style type= "text/css" > /*通用样式--容器宽度值*/ .s......

前端老手
58分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部