Kafka Get Started

原创
2017/04/21 12:15
阅读数 84

简介

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。

特点

分布式消息系统 基于发布/订阅模式 可分区/复制 容错 高吞吐 低延迟 支持水平扩展

基本概念

  • Broker:Kafka集群包含一个或多个节点,一般称节点为broker。
  • Topic:客户端通过Topic发布/消费数据,物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
  • Partition:一个Topic包含一个或多个Partition。
  • Producer:消息发布进程
  • Consumer:消息消费进程
  • Consumer Group:每个 Consumer 属于一个特定的 Consumer Group Kafka 以
  • Group 为订阅单位

kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

安装

cd /opt/beh/core
tar zxf kafka_2.10-0.8.2.2.tar.gz
ln -s kafka_2.10-0.8.2.2 kafka
mkdir -p /opt/beh/data/kafka       #kafka数据日志存储目录
mkdir -p /opt/beh/logs/kafka       #kafka服务日志存储目录

配置

cd /opt/beh/core/kafka/conf

修改服务参数文件server.properties

log.dirs=/opt/beh/data/kafka  #kafka数据日志
zookeeper地址
zookeeper.connect=localhost:2181/kafkazk
delete.topic.enable=true

修改日志参数文件log4j.properties

kafka.logs.dir=/opt/beh/logs/kafka  #kafka服务日志

服务启停

启动前台运行

bin/kafka-server-start.sh config/server.properties

启动后台运行

bin/kafka-server-start.sh -daemon config/server.properties

关闭 bin/kafka-server-stop.sh

topic管理

topic操作

创建topic:test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

显示所有的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

删除topic:test

bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181

描述topic:test

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

测试程序producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
my test message 1
my test message 2

测试程序consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test

查询topic的消费情况

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --topic test --group group1
Group           Topic                          Pid Offset          logSize         Lag             Owner
group1          test                           0   11              11              0               group1_breath-1492743300338-779e9fdb-0

topic offset操作

查询topic的offset的范围

用下面命令可以查询到topic:test broker:localhost:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic test --time -2
test:0:0

用下面命令可以查询到topic:test broker:localhost:9092的offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic test --time -1
test:0:11

从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[0,11]

设置topic的offset

用下面命令可以修改consumergroup:group1 topic:test partition:0的offset为5:

zkCli.sh --server localhost:2181
set /consumers/group1/offsets/test/0 5

重启应用消费将从offset:5开始

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部