文档章节

Kafka修炼日志(三):Streams简明使用教程

鬼谷半桃
 鬼谷半桃
发布于 2017/04/22 13:48
字数 546
阅读 200
收藏 0

       Streams是Kafka 10版本新增的功能,用于实时处理存储与Kafka服务器的数据,并将处理后的结果推送至指定的Topic中,供后续使用者使用。

      下面结合官方教程详述如何使用Streams实时分析处理数据,教程的Demo是一个单词计数器:

(1)首先使用Kafka Topic创建命令创建一个用于生产消息的Topic:streams-file-input。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 3 --partitions 1 --topic streams-file-input

 编写一个消息测试文件file-input.txt,作为消息生产的输入。

[root@localhost kafka_2.12-0.10.2.0]# echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

 (2)在Console上启动一个消息生产者,将file-input的消息生产到Topic:streams-file-input。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt &

(3)使用示例Demo:Word Count,分析处理消息,并将处理后的消息推送至Topic:streams-wordcount-output。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

(4)在Console中启动一个消费者,验证消息的处理结果,输出为所有单词的计数列表。 

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

(5)消费Topic:streams-wordcount-output中的消息,输出结果,这里计数是双倍,是因为我将消息的生产、处理分别运行了两次,这里是累计结果。

[root@localhost kafka_2.12-0.10.2.0]# all       2
lead    2
to      2
hello   2
streams 4
join    2
kafka   6
summit  2

Kafka Streams处理原理

    说明下Kafka Streams的处理过程,以上面的Word Count为例,首先看下图(图片来自Kafka官方网站),KTable为单词计数的实时状态,KStream为每次更新的单词,将更新状态输入到KTable,并推送至消息处理结果指定的Topic。

    下图(图片来自Kafka官方网站)展示了每输入一句单词,已经存在的单词会累计计数,同时KTable会更新不存在的单词。

本文属作者原创,转贴请声明!

© 著作权归作者所有

鬼谷半桃
粉丝 0
博文 3
码字总数 2959
作品 0
嘉兴
程序员
私信 提问
初探Kafka Streams

Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。 本文将从流式计算出发,之后介绍Kafka Streams的特点,最后探究Kafka Streams的架构。 什么是流式...

丞一
2018/01/06
0
0
【kafka】Centos7安装kafka

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kisscatforever/article/details/86091136 一、前言 前一段时间在通知系统中,用到了kafka,刚开始的时候是通...

AresCarry
01/08
0
0
深入剖析 Redis5.0 全新数据结构 Streams(消息队列的新选择)

点击上方“芋道源码”,选择“置顶公众号” 技术文章第一时间送达! 来源:阿飞的博客 Redis 5.0 全新的数据类型:streams,官方把它定义为:以更抽象的方式建模日志的数据结构。Redis的str...

芋道源码
2018/10/24
0
0
Kafka Streams 剖析

1.概述   Kafka Streams 是一个用来处理流式数据的库,属于Java类库,它并不是一个流处理框架,和Storm,Spark Streaming这类流处理框架是明显不一样的。那这样一个库是做什么的,能应用到...

smartloli
2017/09/14
0
0
Apache Kafka 0.11.0.0 发布,分布式消息发布订阅系统

Apache Kafka 0.11.0.0 发布了。kafka 是一种高吞吐量的分布式发布订阅消息系统。 New Feature [KAFKA-3487] - KIP-146: Support per-connector/per-task classloaders in Connect [KAFKA-42......

君枫
2017/06/29
1K
5

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
6
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
7
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
9
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部