文档章节

Flink 实战 : 统计网站PV,UV

moyiguke
 moyiguke
发布于 2018/06/27 16:16
字数 900
阅读 5382
收藏 11

Flink 实战:统计网站PV,UV

PV,UV

  • PV(Page View) : 页面点击次数
  • UV(User View): 独立用户访问次数

假定需求如下,每间隔1分钟,统计过去5分钟的UV,PV。很容易想到,通过数据库的count,以及count distinct可以得出正确结果。在大数据量下,传统数据库或者HADOOP(hbase...)的count效率都不高。如果数据是增量的,那么流式计算往往能提供更高的吞吐和更低的延时。

接下来通过使用Flink实现这个功能,并借这个案例描述一些Flink的基本概念。如果对其他流式计算框架有所了解,可以发现许多东西是互通的。

Window

很容易理解,在这个案例中,我们需要在内存中缓存5分钟的数据,时间往前推移到一分钟的时候,统计一次,并且清理数据。

Flink提供了多种窗口,可以按需选择。

Event Time

考虑到网络的延迟和数据的乱序,不能简单的使用Flink的系统时间做统计。例如14:25分的数据可能在14:27分才到系统中,如果直接按Flink系统时间,即会影响14:20~14:25这段时间的计算结果,同时也会影响14:25~14:30的计算结果。

在Flink中,有以下三种时间特征,查看详细说明

  • Processing time:Operator处理数据的时间。
  • Event time : 事件发生时间。
  • Ingestion time:被Flink摄入的时间。

在统计PV UV时,我们需要根据用户访问的时间,所以使用Event Time。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

接着,我们需要告知Flink记录的真实时间timestamp,以及触发window计算的watermark。在Flink中通过实现接口AssignerWithPeriodicWatermarks来完成。

考虑到数据可能乱序,选择BoundedOutOfOrdernessTimestampExtractor:

      long MAX_EVENT_DELAY = 3500;
      BoundedOutOfOrdernessTimestampExtractor<String> assigner = new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(MAX_EVENT_DELAY)) {
            @Override
            public long extractTimestamp(String element) {
                VisitEvent visitEvent = null;
                try {
                    visitEvent = objectMapper.readValue(element, VisitEvent.class);
                    return visitEvent.getVisitTime();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return Instant.now().toEpochMilli();
            }
        };

上面的一些代码主要用来做时间的处理,真实的计算通过window来完成,代码如下。

        int[] arr = {0,2};
        FlinkKafkaConsumerBase<String> consumerWithEventTime = myConsumer.assignTimestampsAndWatermarks(assigner);
        TypeInformation<Tuple3<String, VisitEvent, String>> typeInformation = TypeInformation.of(new TypeHint<Tuple3<String, VisitEvent, String>>() {});
        DataStreamSource<String> dataStreamByEventTime = env.addSource(consumerWithEventTime);
        SingleOutputStreamOperator<UrlVisitBy> uvCounter = dataStreamByEventTime
                .map(str->objectMapper.readValue(str,VisitEvent.class))
                .map(visitEvent-> new Tuple3<>(visitEvent.getVisitUrl(), visitEvent,visitEvent.getVisitUserId()))
                .returns(typeInformation)
                .keyBy(arr)
                .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1),Time.hours(-8)))
                .allowedLateness(Time.minutes(1))
                .process(new ProcessWindowFunction<Tuple3<String, VisitEvent, String>, UrlVisitBy, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple3<String, VisitEvent, String>> elements, Collector<UrlVisitBy> out) throws Exception {
                        long count = 0;
                        Tuple2<String,String> tuple2 = null;
                        if (tuple instanceof Tuple2){
                            tuple2 = (Tuple2) tuple;
                        }
                        for (Tuple3<String, VisitEvent, String> element : elements) {
                            count++;
                        };
                        TimeWindow window = context.window();
                        out.collect(new UrlVisitBy(window.getStart(),window.getEnd(),tuple2.f0,count,tuple2.f1));
                    }
                });
        uvCounter.print();

建议使用returns(typeInformation)

由于JDK默认的编译器在编译过程中会擦除泛型信息,这样Flink在执行的时候无法获取足够的信息来推断真实类型,那么可能会碰到这样的错误“The generic type parameters of 'XXX' are missing”。

现在只有Eclipse JDT compiler在编译后可以保留足够的信息,但是它限制了开发者只能使用Eclipse编译以及调试。另外由于兼容性问题,Eclipse对Flink的支持并不友好。官方推荐使用Intelij idea。

为了摆脱编译器的限制,Flink采用了TypeInfomation告知Flink真实类型。

© 著作权归作者所有

moyiguke
粉丝 21
博文 16
码字总数 22706
作品 0
杭州
程序员
私信 提问
加载中

评论(3)

h
henry9308
你好,后续这个怎么落库呢,redis?对于落库可能产生重复该如何解决呢(比如某台机器挂掉,数据写入一半就被停止,恢复后从上一个点重新计算落库)
moyiguke
moyiguke 博主

引用来自“idoz”的评论

这样写的话是不是会反序列化两次,一次是visitEvent.getVisitTime(); 另一次是: .map(str->objectMapper.readValue(str,VisitEvent.class))
是的,将DataStream先转化成DataStream会更好一些。
idoz
idoz
这样写的话是不是会反序列化两次,一次是visitEvent.getVisitTime(); 另一次是: .map(str->objectMapper.readValue(str,VisitEvent.class))
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink ...

阿里云官方博客
2019/09/04
242
0
Flink 1.4.2 版本踩过的坑

0x1 摘要 最近业务要实时统计半小时维度的UV、PV数据,经过调研准备用Flink时间窗来实现,主要是Flink对的支持,可以做到更精准的统计,由于第一次尝试使用Flink,所以过程中遇到不少问题,记...

joseph525
2018/06/07
0
0
Apache Flink实战(一) - 简介

1 功能 2 用户 国际 国内 3 特点 ◆ 结合Java、Scala两种语言 ◆ 从基础到实战 ◆ 系统学习Flink的核心知识 ◆ 快速完成从入门到上手企业开发的能力提升 4 安排 ◆ 初识Flink ◆ 编程模型及核...

javaedge
2019/04/26
0
0
Flink实战:消费Wikipedia实时消息

版权声明:欢迎转载,请注明出处,谢谢。 https://blog.csdn.net/boling_cavalry/article/details/85205622 关于Wikipedia Edit Stream Wikipedia Edit Stream是Flink官网提供的一个经典dem...

博陵精骑
2018/12/22
0
0
Apache Flink SQL概览

本篇核心目标是让大家概要了解一个完整的Apache Flink SQL Job的组成部分,以及Apache Flink SQL所提供的核心算子的语义,最后会应用Tumble Window编写一个End-to-End的页面访问的统计示例。...

Ververica
2019/04/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

分布式搜索引擎的架构是怎么设计的?

业内目前来说事实上的一个标准,就是分布式搜索引擎一般大家都用elasticsearch (1) es的分布式架构原理能说一下么(es是如何实现分布式的啊) 面试官心里分析 在搜索这块,lucene是最流行的搜...

Star永恒
20分钟前
97
0
集合转数组

ArrayList提供了一个将List转为数组的一个非常方便的方法toArray。toArray有两个重载的方法:   (1)list.toArray(); // 转换为Object类型的数组Object[] arr = list.toArray();for (...

那个猩猩很亮
23分钟前
78
0
使用canvas在前端添加水印

<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>Title</title> <meta name="renderer" content="webkit"> <meta http-equiv="X-UA-Compatible" cont......

liyoungs
23分钟前
18
0
喜马拉雅音频下载工具 支持免费音频/VIP音频 文尾有彩蛋

最近流行喜马拉雅,趁你有会员,教你下载其资源 .如果没有会员可以找别人的会员获取cookie也可以.也没有别人的会员可以用此工具下载免费的音频也是不错的选择. 下载地址:https://www.90pan....

xiaogg
32分钟前
61
0
zookeeper安装(基础篇)

Zookeeper三种部署模式 1、Standalone模式,单节点部署,适合测试环境。2、伪cluster模式,单节点多实例部署,适合测试环境。3、Cluster模式,多节点集群部署,适合生产环境。 安装前准备...

丁小屁
36分钟前
35
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部