文档章节

Kafka0.11发布--流式计算新玩法

moyiguke
 moyiguke
发布于 2017/08/19 23:17
字数 784
阅读 16
收藏 1
点赞 0
评论 0

    Kafka streams的相关中文资料非常少,笔者希望借该代码讲述一下自己对kafka streams API的用法。

    kafka streams从0.10.0开始引入,现在已经更新到0.11.0。首先它的使用成本非常低廉,仅需在代码中依赖streams lib,编写计算逻辑,启动APP即可。其次它的负载均衡也非常简单暴力,增加或者减少运行实例就可以动态调整,无需人工干预。最后还有一个大杀器(0.11开始支持),提供 Exactly-once消息传递特性,它包含了producer幂等性,不会重复发送消息到broker;consumer exactly once,不会重复消费也不会丢失。在运算失败的时候,重启运算实例即可恢复。

    下面用demo讲解streams api用法。

    WordCount:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, textLinesTopic);

KStream<String, Long> wordCounts = textLines
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, word) -> new KeyValue<>(word, word))
      .groupByKey().count("counts").toStream();

wordCounts.to(stringSerde, longSerde, countTopic);

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

        这段非常简短的代码包括了consumer顶阅主题并消费、统计词频、producer写入到另一个Topic。跟踪代码可以发现flatMapValues,map函数本质上是处理单元processor,在函数调用时,API会创建特定的processor加入到拓扑中。

        这种消费单个主题进行运算的方式可以做一些日志的统计分析,例如网站的UV,PV等。如果需要处理更复杂的业务,那么关联操作不可避免。同样的,kafka streams 提供了join函数。

        

        KStreamBuilder streamBuilder = new KStreamBuilder();
        String userStore = "user_store";
        String driverStore = "driver_Store";
        KTable<String, UserOrder> userOrderKTable = streamBuilder.table(Serdes.String(),
                SerdeFactory.serdeFrom(UserOrder.class), TOPIC_USER_ORER, userStore);
        KTable<String, DriverOrder> driverOrderKTable= streamBuilder.table(Serdes.String(),
                SerdeFactory.serdeFrom(DriverOrder.class), TOPIC_DRIVER_ORDER, driverStore);

        userOrderKTable.leftJoin(driverOrderKTable,
                (userOrder,driverOrder)->join(userOrder,driverOrder))
                .toStream()
                .map((k,v)->new KeyValue<>(k,v))
                .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);

         join的语法本质上是join by partition and key。为了得到正确的Join结果,两个不同的topic需要再同一个运行实例中被消费到。假设,Topic1 和Topic2各有4个partition,有两个实例在运行,于是一个task仅消费Topic1和Topic2的两个,这样就需要保证,topic1中的两个partition的key值在另一个topic中能被找到。

       上面的逻辑看起来非常绕口,在实际开发的过程中,我们仅需保证两个topic拥有相同数量的partition,并且producer采用同样的Paritioner。如果该条件不满足,需要通过through函数完成。            


    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, 
         StreamPartitioner<K, V> partitioner, String topic);

           假设在join过程中,我们需要最新的数据做聚合。kafka streams 提供了windowed函数,在时间窗口内,后续的记录会覆盖同一个Key的记录。窗口结束后,会触发后续的计算逻辑得到正确的结果。

      KTable<Windowed<String>, MultiUserOrder> userOrderKTable = streamBuilder.stream(Serdes.String(),
                SerdeFactory.serdeFrom(UserOrder.class),
                TOPIC_USER_ORER, userStore)
                .groupByKey()
                .aggregate(
                        new MultiUserOrder(), (k, v, map) -> {
                                map.setOrderId(k);
                                map.getOrders().add(v);
                                return map;
                        },
                        TimeWindows.of(6 * 1000),
                        SerdeFactory.serdeFrom(MultiUserOrder.class), userAggStore);


        KTable<Windowed<String>, MultiDriverOrder> driverOrderKTable = streamBuilder.stream(Serdes.String(),
                SerdeFactory.serdeFrom(DriverOrder.class),
                TOPIC_DRIVER_ORDER, driverStore)
                .groupByKey()
                .aggregate(new MultiDriverOrder(), (k, v, map) -> {
                            map.setOrderId(k);
                            map.getOrders().add(v);
                            return map;
                        }, TimeWindows.of(6 * 1000),
                        SerdeFactory.serdeFrom(MultiDriverOrder.class), driverAggStore);

        userOrderKTable.leftJoin(driverOrderKTable,
                (multiUserOrder,multiDriverOrder)->join(multiUserOrder,multiDriverOrder))
                .toStream()
                .map((k,v)->new KeyValue<>(k.key(),v))
                .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);

        

 

 

 

    

© 著作权归作者所有

共有 人打赏支持
moyiguke
粉丝 6
博文 9
码字总数 15701
作品 0
杭州
程序员
2016 | 大数据平台类产品资讯汇总

InfoSphere Streams 平台支持流数据的实时处理,支持不断更新持续查询的结果,可在移动的数据流中检测洞察。 InfoSphere Streams——实时大数据分析平台 Streams V4.2新特性:支持使用 Pyth...

勿忘初心321 ⋅ 2016/11/25 ⋅ 0

效率提升50倍,轻松处理大数据

阿里云流计算正式上线——新用户免费试用一个月 日前,阿里云宣布流计算(Aliyun StreamCompute,Powered by Blink)正式发布商业化版本。阿里云流计算是运行在阿里云平台上的流式大数据分析平...

若有-若无 ⋅ 06/21 ⋅ 0

收藏 | 100+篇大数据学习资讯,带你玩转大数据分析!

深度解析如何挑选适合自己的Hadoop平台 什么是Hadoop,怎样学习Hadoop 分布式文件系统HDFS解析 Hadoop开发人员基础课程之初识MapReduce HBase基础知识,面向列的实时分布式数据库 完全分布式...

勿忘初心321 ⋅ 2016/11/22 ⋅ 0

谷歌弃用 MapReduce, 推出替代品 Cloud Dataflow

谷歌在周三发布了 Cloud Dataflow,一个用来既可以使用流式处理又可以使用批处理模式的大数据分析服务。 这个消息是在旧金山举行的谷歌 I/O 大会上公布的。它帮助完成了搜索巨头的云计算格局...

oschina ⋅ 2014/06/27 ⋅ 19

Sloth:网易流计算服务化平台架构实践

网易经历了多年的发展,众多业务线沉淀了丰富的数据。大数据平台除了满足各业务线的数据存储、计算需求,还负责集团数据整合,提供全方位的大数据服务。 本文PPT重点分享网易大数据基于Flink...

b6ecl1k7bs8o ⋅ 2017/11/24 ⋅ 0

2017杭州云栖大会100位大咖视频+讲义全分享 大数据

摘要: “如果我看得更远一点的话,是因为我站在巨人的肩膀上。”2017杭州云栖大会资料全部整理完毕,首批100位大咖视频+讲义分享给大家。 杭州云栖大会是阿里集团一年一度的全生态科技盛会。...

qq_40954115 ⋅ 2017/11/06 ⋅ 0

1月12日云栖精选夜读:阿里云新推出 HiTSDB + IoT套件 物联网设备上云步入快车道

阿里云针对物联网企业遇到的5大痛点,提供了HiTSDB +IoT 套件的一体化解决方案,能够支持物联设备快速上云,高效设备管理,数据安全,低成本海量数据存储,实时掌握设备状态,快速发现数据价...

yq传送门 ⋅ 01/12 ⋅ 0

速度惊人!网易云信力推全球首套直播竞答解决方案

  近日,直播答题App掀起一轮热潮。几乎为零的参与门槛、邀请码式的病毒传播、简单粗暴的奖金诱惑以及闯关答题的紧张刺激,在这些因素的加持下,   近日,直播答题App掀起一轮热潮。几乎...

镁客网 ⋅ 01/08 ⋅ 0

Spark设计理念与基本架构

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市 《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第...

beliefer ⋅ 2016/01/22 ⋅ 0

2018年web新玩法

摘要:web更新的速度都能赶得上火车的速度,稍有不慎就会掉队,在这里6年前端狗带你分析一下2018年前端的一些新趋势,新变化。 一、typeScript TypeScript 是一种由微软开发的自由和开源的编...

技术金三胖 ⋅ 01/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 今天 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

实验楼—MySQL基础课程-挑战3实验报告

按照文档要求创建数据库 sudo sercice mysql startwget http://labfile.oss.aliyuncs.com/courses/9/createdb2.sqlvim /home/shiyanlou/createdb2.sql#查看下数据库代码 代码创建了grade......

zhangjin7 ⋅ 昨天 ⋅ 0

一起读书《深入浅出nodejs》-node模块机制

node 模块机制 前言 说到node,就不免得提到JavaScript。JavaScript自诞生以来,经历了工具类库、组件库、前端框架、前端应用的变迁。通过无数开发人员的努力,JavaScript不断被类聚和抽象,...

小草先森 ⋅ 昨天 ⋅ 0

Java桌球小游戏

其实算不上一个游戏,就是两张图片,不停的重画,改变ball图片的位置。一个左右直线碰撞的,一个有角度碰撞的。 左右直线碰撞 package com.bjsxt.test;import javax.swing.*;import j...

森林之下 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部