文档章节

kafka tools中的管理工具

 张欢19933
发布于 2017/06/28 16:51
字数 1845
阅读 35
收藏 0
点赞 0
评论 0

Kafka内部提供了许多管理脚本,这些脚本都放在$KAFKA_HOME/bin目录下,而这些类的实现都是放在源码的kafka/core/src/main/scala/kafka/tools/路径下。

Consumer Offset Checker

Consumer Offset Checker主要是运行kafka.tools.ConsumerOffsetChecker类,对应的脚本是kafka-consumer-offset-checker.sh,会显示出Consumer的Group、Topic、分区ID、分区对应已经消费的Offset、logSize大小,Lag以及Owner等信息。

如果运行kafka-consumer-offset-checker.sh脚本的时候什么信息都不输入,那么会显示以下信息:

[iteblog@www.iteblog.com /]$ bin/kafka-consumer-offset-checker.sh
Check the offset of your consumers.
Option                                  Description                            
------                                  -----------                            
--broker-info                           Print broker info                      
--group                                 Consumer group.                        
--help                                  Print this message.                    
--retry.backoff.ms <Integer>            Retry back-off to use for failed       
                                          offset queries. (default: 3000)      
--socket.timeout.ms <Integer>           Socket timeout to use when querying    
                                          for offsets. (default: 6000)         
--topic                                 Comma-separated list of consumer       
                                          topics (all topics if absent).       
--zookeeper                             ZooKeeper connect string. (default:    
                                          localhost:2181)

我们根据提示,输入的命令如下:

[iteblog@www.iteblog.com /]$ bin/kafka-consumer-offset-checker.sh --zookeeper www.iteblog.com:2181 --topic test --group spark --broker-info
Group           Topic      Pid Offset          logSize         Lag             Owner
spark    test       0   34666914        34674392        7478            none
spark    test       1   34670481        34678029        7548            none
spark    test       2   34670547        34678002        7455            none
spark    test       3   34664512        34671961        7449            none
spark    test       4   34680143        34687562        7419            none
spark    test       5   34672309        34679823        7514            none
spark    test       6   34674660        34682220        7560            none
BROKER INFO
2 -> www.iteblog.com:9092
5 -> www.iteblog.com:9093
4 -> www.iteblog.com:9094
7 -> www.iteblog.com:9095
1 -> www.iteblog.com:9096
3 -> www.iteblog.com:9097
6 -> www.iteblog.com:9098

Dump Log Segment

有时候我们需要验证日志索引是否正确,或者仅仅想从log文件中直接打印消息,我们可以使用kafka.tools.DumpLogSegments类来实现,先来看看它需要的参数:

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.
Option                                  Description                            
------                                  -----------                            
--deep-iteration                        if set, uses deep instead of shallow   
                                          iteration                            
--files <file1, file2, ...>             REQUIRED: The comma separated list of  
                                          data and index log files to be dumped
--key-decoder-class                     if set, used to deserialize the keys.  
                                          This class should implement kafka.   
                                          serializer.Decoder trait. Custom jar 
                                          should be available in kafka/libs   
                                          directory. (default: kafka.          
                                          serializer.StringDecoder)            
--max-message-size <Integer: size>      Size of largest message. (default:     
                                          5242880)                             
--print-data-log                        if set, printing the messages content  
                                          when dumping data logs               
--value-decoder-class                   if set, used to deserialize the        
                                          messages. This class should          
                                          implement kafka.serializer.Decoder   
                                          trait. Custom jar should be          
                                          available in kafka/libs directory.   
                                          (default: kafka.serializer.          
                                          StringDecoder)                       
--verify-index-only                     if set, just verify the index log      
                                          without printing its content

很明显,我们在使用kafka.tools.DumpLogSegments的时候必须输入--files,这个参数指的就是Kafka中Topic分区所在的绝对路径。分区所在的目录由config/server.properties文件中log.dirs参数决定。比如我们想看/home/q/kafka/kafka_2.10-0.8.2.1/data/test-4/00000000000034245135.log日志文件的相关情况可以 使用下面的命令:

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /iteblog/data/test-4/00000000000034245135.log
Dumping /home/q/kafka/kafka_2.10-0.8.2.1/data/test-4/00000000000034245135.log
Starting offset: 34245135
offset: 34245135 position: 0 isvalid: true payloadsize: 4213 magic: 0 compresscodec: NoCompressionCodec crc: 865449274 keysize: 4213
offset: 34245136 position: 8452 isvalid: true payloadsize: 4657 magic: 0 compresscodec: NoCompressionCodec crc: 4123037760 keysize: 4657
offset: 34245137 position: 17792 isvalid: true payloadsize: 3921 magic: 0 compresscodec: NoCompressionCodec crc: 541297511 keysize: 3921
offset: 34245138 position: 25660 isvalid: true payloadsize: 2290 magic: 0 compresscodec: NoCompressionCodec crc: 1346104996 keysize: 2290
offset: 34245139 position: 30266 isvalid: true payloadsize: 2284 magic: 0 compresscodec: NoCompressionCodec crc: 1930558677 keysize: 2284
offset: 34245140 position: 34860 isvalid: true payloadsize: 268 magic: 0 compresscodec: NoCompressionCodec crc: 57847488 keysize: 268
offset: 34245141 position: 35422 isvalid: true payloadsize: 263 magic: 0 compresscodec: NoCompressionCodec crc: 2964399224 keysize: 263
offset: 34245142 position: 35974 isvalid: true payloadsize: 1875 magic: 0 compresscodec: NoCompressionCodec crc: 647039113 keysize: 1875
offset: 34245143 position: 39750 isvalid: true payloadsize: 648 magic: 0 compresscodec: NoCompressionCodec crc: 865445580 keysize: 648
offset: 34245144 position: 41072 isvalid: true payloadsize: 556 magic: 0 compresscodec: NoCompressionCodec crc: 1174686061 keysize: 556
offset: 34245145 position: 42210 isvalid: true payloadsize: 4211 magic: 0 compresscodec: NoCompressionCodec crc: 3691302513 keysize: 4211
offset: 34245146 position: 50658 isvalid: true payloadsize: 2299 magic: 0 compresscodec: NoCompressionCodec crc: 2367114411 keysize: 2299
offset: 34245147 position: 55282 isvalid: true payloadsize: 642 magic: 0 compresscodec: NoCompressionCodec crc: 4122061921 keysize: 642
offset: 34245148 position: 56592 isvalid: true payloadsize: 4211 magic: 0 compresscodec: NoCompressionCodec crc: 3257991653 keysize: 4211
offset: 34245149 position: 65040 isvalid: true payloadsize: 2278 magic: 0 compresscodec: NoCompressionCodec crc: 2103489307 keysize: 2278
offset: 34245150 position: 69622 isvalid: true payloadsize: 269 magic: 0 compresscodec: NoCompressionCodec crc: 792857391 keysize: 269
offset: 34245151 position: 70186 isvalid: true payloadsize: 640 magic: 0 compresscodec: NoCompressionCodec crc: 791599616 keysize: 640

可以看出,这个命令将Kafka中Message中Header的相关信息和偏移量都显示出来了,但是没有看到日志的内容,我们可以通过--print-data-log来设置。如果需要查看多个日志文件,可以以逗号分割。

导出Zookeeper中Group相关的偏移量

有时候我们需要导出某个Consumer group各个分区的偏移量,我们可以通过使用Kafka的kafka.tools.ExportZkOffsets类来满足。来看看这个类需要的参数:

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.ExportZkOffsets
Export consumer offsets to an output file.
Option                                  Description                            
------                                  -----------                            
--group                                 Consumer group.                        
--help                                  Print this message.                    
--output-file                           Output file                           
--zkconnect                             ZooKeeper connect string. (default:    
                                          localhost:2181)

我们需要输入Consumer group,Zookeeper的地址以及保存文件路径:

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group spark --zkconnect www.iteblog.com:2181 --output-file ~/offset
 
[iteblog@www.iteblog.com /]$ vim ~/offset
/consumers/spark/offsets/test/3:34846274
/consumers/spark/offsets/test/2:34852378
/consumers/spark/offsets/test/1:34852360
/consumers/spark/offsets/test/0:34848170
/consumers/spark/offsets/test/6:34857010
/consumers/spark/offsets/test/5:34854268
/consumers/spark/offsets/test/4:34861572

注意,--output-file参数必须在指定,否则会出错。

通过JMX获取metrics信息

我们可以通过kafka.tools.JmxTool类打印出Kafka相关的metrics信息。

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.JmxTool
Dump JMX values to standard output.
Option                                  Description                            
------                                  -----------                            
--attributes <name>                     The whitelist of attributes to query.  
                                          This is a comma-separated list. If   
                                          no attributes are specified all      
                                          objects will be queried.             
--date-format <format>                  The date format to use for formatting  
                                          the time field. See java.text.       
                                          SimpleDateFormat for options.        
--help                                  Print usage information.               
--jmx-url <service-url>                 The url to connect to to poll JMX      
                                          data. See Oracle javadoc for        
                                          JMXServiceURL for details. (default: 
                                          service:jmx:rmi:///jndi/rmi://:      
                                          9999/jmxrmi)                         
--object-name <name>                    A JMX object name to use as a query.   
                                          This can contain wild cards, and     
                                          this option can be given multiple    
                                          times to specify more than one       
                                          query. If no objects are specified   
                                          all objects will be queried.         
--reporting-interval <Integer: ms>      Interval in MS with which to poll jmx  
                                          stats. (default: 2000) 

可以这么使用

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://www.iteblog.com:1099/jmxrmi

运行上面命令前提是在启动kafka集群的时候指定export JMX_PORT=,这样才会开启JMX。然后就可以通过上面命令打印出Kafka所有的metrics信息。

Kafka数据迁移工具

这个工具主要有两个:kafka.tools.KafkaMigrationTool和kafka.tools.MirrorMaker。第一个主要是用于将Kafka 0.7上面的数据迁移到Kafka 0.8(https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8);而后者可以同步两个Kafka集群的数据(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330)。都是从原端消费Messages,然后发布到目标端。

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool --kafka.07.jar kafka-0.7.19.jar --zkclient.01.jar zkclient-0.2.0.jar --num.producers 16 --consumer.config=sourceCluster2Consumer.config --producer.config=targetClusterProducer.config --whitelist=.*
 
[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

日志重放工具

这个工具主要作用是从一个Kafka集群里面读取指定Topic的消息,并将这些消息发送到其他集群的指定topic中:

[iteblog@www.iteblog.com /]$ bin/kafka-replay-log-producer.sh 
Missing required argument "[broker-list]"
Option                                  Description                            
------                                  -----------                            
--broker-list <hostname:port>           REQUIRED: the broker list must be      
                                          specified.                           
--inputtopic <input-topic>              REQUIRED: The topic to consume from.   
--messages <Integer: count>             The number of messages to send.        
                                          (default: -1)                        
--outputtopic <output-topic>            REQUIRED: The topic to produce to      
--property <producer properties>        A mechanism to pass properties in the  
                                          form key=value to the producer. This 
                                          allows the user to override producer 
                                          properties that are not exposed by   
                                          the existing command line arguments  
--reporting-interval <Integer: size>    Interval at which to print progress    
                                          info. (default: 5000)                
--sync                                  If set message send requests to the    
                                          brokers are synchronously, one at a  
                                          time as they arrive.                 
--threads <Integer: threads>            Number of sending threads. (default: 1)
--zookeeper <zookeeper url>             REQUIRED: The connection string for   
                                          the zookeeper connection in the form 
                                          host:port. Multiple URLS can be      
                                          given to allow fail-over. (default:  
                                          127.0.0.1:2181)

Simple Consume脚本

kafka-simple-consumer-shell.sh工具主要是使用Simple Consumer API从指定Topic的分区读取数据并打印在终端:

bin/kafka-simple-consumer-shell.sh --broker-list www.iteblog.com:9092 --topic test --partition 0

更新Zookeeper中的偏移量

kafka.tools.UpdateOffsetsInZK工具可以更新Zookeeper中指定Topic所有分区的偏移量,可以指定成 earliest或者latest:

[iteblog@www.iteblog.com /]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

需要指定是更新成earliest或者latest,consumer.properties文件的路径以及topic的名称

consumer.properties文件配置内容如下:
zookeeper.connect=www.iteblog.com:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=group

当然我们也可以说手动在zk里修改。

本文转载自:https://www.iteblog.com/archives/1605.html

共有 人打赏支持
粉丝 30
博文 425
码字总数 212727
作品 0
海淀
【Kafka源码】Kafka代码模块

Kafka源码依赖于Scala环境,首先需要安装scala,这块请自行百度进行安装。 传送门 当然,我们要分析源码,需要下载源码,请自行从github上面下载。 说明:本文使用的kafka版本为0.10.0.1,这...

端木轩
2017/10/23
0
0
Kafka 源码分析1 : 基础搭建和项目结构介绍

原文出处:刘正阳 背景 从kafka也算有两年了,用它做了不少项目,但是之前对它的认识也仅仅停留在一些从其他地方听到的概念和官方文档的documentation上在遇到一些问题时往往不知道其原理只能...

刘正阳
05/16
0
0
Metamorphosis 1.4.2 发布,分布式消息中间件

Metamorphosis是一个高性能、高可用、可扩展的分布式消息中间件,思路起源于LinkedIn的Kafka,但并不是Kafka的一个 Copy。具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞...

鉴客
2012/05/10
751
1
kafka管理监控插件

kafka 监控插件 1.KafkaOffsetMonitor 项目地址:https://github.com/quantifind/KafkaOffsetMonitor KafkaOffsetMonitor是用来实时监控Kafka集群的consumers以及它们在partition中的offset(......

Ryan-瑞恩
2015/09/23
514
0
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
453
1
kafka manger 安装

0-项目介绍 Kafka在雅虎内部被很多团队使用,媒体团队用它做实时分析流水线,可以处理高达20Gbps(压缩数据)的峰值带宽。 为了简化开发者和服务工程师维护Kafka集群的工作,构建了一个叫做Kafka...

杨春炼
2016/06/23
545
0
Kafka-manager部署与测试(完整)

Kafka-manager部署 一、概念 概念百度了一下,可以根据相关资料进行理解。 1.1 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 Broker Kaf...

KaliArch
2017/06/12
0
0
zhuan/shop-dubbox

** 参考基本分布式框架 ** http://git.oschina.net/catshen/cat ** 项目迁移 https://git.oschina.net/JiaGou-XiaoGe/payshop ** 学习资料和视频 https://pan.baidu.com/s/1jIoLf9O 演示地址......

zhuan
2017/07/19
0
0
kafka connect分布式安装

kafka connect分布式部署 Apache Kafka Schema Registry Kafka Connect Kafka Rest Proxy Kafka Clients 说明:以上服务除Apache kafka由Linkedin始创并开源,其他组件皆由Confluent公司开发...

-九天-
2017/12/10
0
3
除 Apache Spark 外的三种新兴开源数据分析工具

在数据分析方面,影响深远的变化正在酝酿之中,而开源工具在引领许多变化。当然,你可能已熟悉这个领域的一些明星开源项目,比如Hadoop和Apache Spark,不过现在出现了强烈的要求,需要全面完...

oschina
2016/06/20
3.1K
8

没有更多内容

加载失败,请刷新页面

加载更多

下一页

回想过往,分析当下,着眼未来

好久没有真正的在纸质笔记本上写过东西了,感觉都快不会写字了,笔画都不知道怎么写了。接下来就说说咱们的正事。 2018年7月22日,我做了一个决定,那就是去参加安全培训(可能是我职业生涯中...

yeahlife
40分钟前
1
0
关于工作中的人际交往

关于工作中的人际交往 Intro 写了篇发泄情绪的博客,但不会发布出来。 大概就是,要么忍,要么滚。 以及一些不那么符合社会主义核心价值观,不满于大资本家与小资本家剥削的废话。

uniqptr
45分钟前
0
0
springMVC的流程

1.用户发送请求至前端控制器DispatcherServlet 2.DispatcherServlet收到请求调用HandlerMapping处理器映射器。 3.处理器映射器根据请求url找到具体的处理器,生成处理器对象及处理器拦截器(...

JavaSon712
今天
0
0
大数据教程(3.2):Linux系统软件安装之自动化脚本

博主前面文章有介绍过软件的安装,可以帮助IT人员顺利的完成功能软件安装;但是,对于我们运维人员或者需要管理软件安装的项目经理来说,有些应用一次行需要搭建很多台相同的软件环境(如tom...

em_aaron
今天
0
1
Spring Boot 2.0.3 JDBC整合Oracle 12

整合步骤 1. Oracle驱动引入 Oracle驱动一般不能通过maven仓库直接下载得到,需自行下载并导入到项目的lib目录下,建议通过如下pom依赖引入下载的Oracle驱动 <!-- Oracle 驱动 -->...

OSC_fly
今天
0
0
java 8 并行流 - 1

下面创建一个并行流,与顺序流 //顺序流Stream.iterate(0L, i -> i + 1) .limit(Integer.MAX_VALUE) .reduce(0L, Long::sum);//并行流Stream.iterate(0L, i -> i......

Canaan_
今天
0
0
数据结构与算法5

二分法采用向下取整的方法 使用有序数组的好处是查找的速度比无序数组快的多,不好的方面是因为要将所有靠后的数据移开,所以速度较慢,有序数组和无序数组的删除操作都很慢。 有序数组在查找...

沉迷于编程的小菜菜
昨天
1
1
SpringBoot | 第十一章:Redis的集成和简单使用

前言 上几节讲了利用Mybatis-Plus这个第三方的ORM框架进行数据库访问,在实际工作中,在存储一些非结构化或者缓存一些临时数据及热点数据时,一般上都会用上mongodb和redis进行这方面的需求。...

oKong
昨天
5
0
对基于深度神经网络的Auto Encoder用于异常检测的一些思考

一、前言 现实中,大部分数据都是无标签的,人和动物多数情况下都是通过无监督学习获取概念,故而无监督学习拥有广阔的业务场景。举几个场景:网络流量是正常流量还是攻击流量、视频中的人的...

冷血狂魔
昨天
0
0
并发设计之A系统调用B系统

A-->B A在发送请求之前,用乐观锁,减少对B的重复调用,这样一定程度上是幂等性。 比如A系统支付功能,要调用B系统进行支付操作,但是前端对"支付"按钮不进行控制,即用户会不断多次点击支付...

汉斯-冯-拉特
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部