文档章节

Kafka0.11发布--流式计算新玩法

moyiguke
 moyiguke
发布于 2017/08/19 23:17
字数 784
阅读 446
收藏 1

    Kafka streams的相关中文资料非常少,笔者希望借该代码讲述一下自己对kafka streams API的用法。

    kafka streams从0.10.0开始引入,现在已经更新到0.11.0。首先它的使用成本非常低廉,仅需在代码中依赖streams lib,编写计算逻辑,启动APP即可。其次它的负载均衡也非常简单暴力,增加或者减少运行实例就可以动态调整,无需人工干预。最后还有一个大杀器(0.11开始支持),提供 Exactly-once消息传递特性,它包含了producer幂等性,不会重复发送消息到broker;consumer exactly once,不会重复消费也不会丢失。在运算失败的时候,重启运算实例即可恢复。

    下面用demo讲解streams api用法。

    WordCount:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, textLinesTopic);

KStream<String, Long> wordCounts = textLines
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, word) -> new KeyValue<>(word, word))
      .groupByKey().count("counts").toStream();

wordCounts.to(stringSerde, longSerde, countTopic);

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

        这段非常简短的代码包括了consumer顶阅主题并消费、统计词频、producer写入到另一个Topic。跟踪代码可以发现flatMapValues,map函数本质上是处理单元processor,在函数调用时,API会创建特定的processor加入到拓扑中。

        这种消费单个主题进行运算的方式可以做一些日志的统计分析,例如网站的UV,PV等。如果需要处理更复杂的业务,那么关联操作不可避免。同样的,kafka streams 提供了join函数。

        

        KStreamBuilder streamBuilder = new KStreamBuilder();
        String userStore = "user_store";
        String driverStore = "driver_Store";
        KTable<String, UserOrder> userOrderKTable = streamBuilder.table(Serdes.String(),
                SerdeFactory.serdeFrom(UserOrder.class), TOPIC_USER_ORER, userStore);
        KTable<String, DriverOrder> driverOrderKTable= streamBuilder.table(Serdes.String(),
                SerdeFactory.serdeFrom(DriverOrder.class), TOPIC_DRIVER_ORDER, driverStore);

        userOrderKTable.leftJoin(driverOrderKTable,
                (userOrder,driverOrder)->join(userOrder,driverOrder))
                .toStream()
                .map((k,v)->new KeyValue<>(k,v))
                .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);

         join的语法本质上是join by partition and key。为了得到正确的Join结果,两个不同的topic需要再同一个运行实例中被消费到。假设,Topic1 和Topic2各有4个partition,有两个实例在运行,于是一个task仅消费Topic1和Topic2的两个,这样就需要保证,topic1中的两个partition的key值在另一个topic中能被找到。

       上面的逻辑看起来非常绕口,在实际开发的过程中,我们仅需保证两个topic拥有相同数量的partition,并且producer采用同样的Paritioner。如果该条件不满足,需要通过through函数完成。            


    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, 
         StreamPartitioner<K, V> partitioner, String topic);

           假设在join过程中,我们需要最新的数据做聚合。kafka streams 提供了windowed函数,在时间窗口内,后续的记录会覆盖同一个Key的记录。窗口结束后,会触发后续的计算逻辑得到正确的结果。

      KTable<Windowed<String>, MultiUserOrder> userOrderKTable = streamBuilder.stream(Serdes.String(),
                SerdeFactory.serdeFrom(UserOrder.class),
                TOPIC_USER_ORER, userStore)
                .groupByKey()
                .aggregate(
                        new MultiUserOrder(), (k, v, map) -> {
                                map.setOrderId(k);
                                map.getOrders().add(v);
                                return map;
                        },
                        TimeWindows.of(6 * 1000),
                        SerdeFactory.serdeFrom(MultiUserOrder.class), userAggStore);


        KTable<Windowed<String>, MultiDriverOrder> driverOrderKTable = streamBuilder.stream(Serdes.String(),
                SerdeFactory.serdeFrom(DriverOrder.class),
                TOPIC_DRIVER_ORDER, driverStore)
                .groupByKey()
                .aggregate(new MultiDriverOrder(), (k, v, map) -> {
                            map.setOrderId(k);
                            map.getOrders().add(v);
                            return map;
                        }, TimeWindows.of(6 * 1000),
                        SerdeFactory.serdeFrom(MultiDriverOrder.class), driverAggStore);

        userOrderKTable.leftJoin(driverOrderKTable,
                (multiUserOrder,multiDriverOrder)->join(multiUserOrder,multiDriverOrder))
                .toStream()
                .map((k,v)->new KeyValue<>(k.key(),v))
                .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);

        

 

 

 

    

© 著作权归作者所有

moyiguke
粉丝 21
博文 16
码字总数 22706
作品 0
杭州
程序员
私信 提问
加载中

评论(0)

2016 | 大数据平台类产品资讯汇总

InfoSphere Streams 平台支持流数据的实时处理,支持不断更新持续查询的结果,可在移动的数据流中检测洞察。 InfoSphere Streams——实时大数据分析平台 Streams V4.2新特性:支持使用 Pyth...

勿忘初心321
2016/11/25
37
0
Kafka kSQL sql查询

背景 kafka早期作为一个日志消息系统,很受运维欢迎的,配合ELK玩起来很happy,在kafka慢慢的转向流式平台的过程中,开发也慢慢介入了,一些业务系统也开始和kafka对接起来了,也还是很受大家...

osc_uwpj27el
2019/09/09
26
0
Java 11 正式发布,这 8 个逆天新特性教你写出更牛逼的代码

美国时间 09 月 25 日,Oralce 正式发布了 Java 11,这是据 Java 8 以后支持的首个长期版本。 为什么说是长期版本,看下面的官方发布的支持路线图表。 可以看出 Java 8 扩展支持到 2025 年,...

Java技术栈
2018/09/27
0
0
效率提升50倍,轻松处理大数据

阿里云流计算正式上线——新用户免费试用一个月 日前,阿里云宣布流计算(Aliyun StreamCompute,Powered by Blink)正式发布商业化版本。阿里云流计算是运行在阿里云平台上的流式大数据分析平...

若有-若无
2018/06/21
0
0
收藏 | 100+篇大数据学习资讯,带你玩转大数据分析!

深度解析如何挑选适合自己的Hadoop平台 什么是Hadoop,怎样学习Hadoop 分布式文件系统HDFS解析 Hadoop开发人员基础课程之初识MapReduce HBase基础知识,面向列的实时分布式数据库 完全分布式...

勿忘初心321
2016/11/22
56
0

没有更多内容

加载失败,请刷新页面

加载更多

直接显示StackOverflow的答题日期, 增加评论区回复的时间显示 ,修改时间显示到小时分。

// ==UserScript==// @name 直接显示StackOverflow的答题日期, 增加评论区回复的时间显示 ,修改时间显示到小时分。// @namespace http://tampermonkey.net/// @version ...

FalconChen
今天
36
0
Shader笔记_005 纹理

纹理最初的目的就是使用一张图片来控制模型的外观,通过纹理映射技术 我们可以把一张图粘贴在物体表面,逐纹素的控制模型的颜色。 通常美术建模的时候也会在软件里利用纹理展开技术把纹理展开成...

STONE-CITY
今天
12
0
iOS MVVM 与RAC结合使用

MVVM配合 RAC 更能发挥的淋漓尽致。 我们把 MVVM 第一篇的例子 KVO 的事件 替换成 配合RAC 框架使用, OC的话直接导入 : pod 'ReactiveObjC' Swift 直接用 RXSwift就可以。 把 ViewModel里加...

T型人才追梦者
今天
22
1
OSChina 周一乱弹 —— 影响心情的三座大山

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《浮生(inst.)》- 忘乡 / 墨凡悦 手机党少年们想听歌,请使劲儿戳(这里) @凝小紫...

小小编辑
今天
66
1
Unity中头发渲染

头发与普通PBR 材质最大的区别是 头发是各项异性的高光, 参考实现主要为下面文章 http://web.engr.oregonstate.edu/~mjb/cs519/Projects/Papers/HairRendering.pdf 头发包含 2个高光,以及高...

liyong2
今天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部