Flink 实战 : 统计网站PV,UV

原创
2018/06/27 16:16
阅读数 2.5W

Flink 实战:统计网站PV,UV

PV,UV

  • PV(Page View) : 页面点击次数
  • UV(User View): 独立用户访问次数

假定需求如下,每间隔1分钟,统计过去5分钟的UV,PV。很容易想到,通过数据库的count,以及count distinct可以得出正确结果。在大数据量下,传统数据库或者HADOOP(hbase...)的count效率都不高。如果数据是增量的,那么流式计算往往能提供更高的吞吐和更低的延时。

接下来通过使用Flink实现这个功能,并借这个案例描述一些Flink的基本概念。如果对其他流式计算框架有所了解,可以发现许多东西是互通的。

Window

很容易理解,在这个案例中,我们需要在内存中缓存5分钟的数据,时间往前推移到一分钟的时候,统计一次,并且清理数据。

Flink提供了多种窗口,可以按需选择。

Event Time

考虑到网络的延迟和数据的乱序,不能简单的使用Flink的系统时间做统计。例如14:25分的数据可能在14:27分才到系统中,如果直接按Flink系统时间,即会影响14:20~14:25这段时间的计算结果,同时也会影响14:25~14:30的计算结果。

在Flink中,有以下三种时间特征,查看详细说明

  • Processing time:Operator处理数据的时间。
  • Event time : 事件发生时间。
  • Ingestion time:被Flink摄入的时间。

在统计PV UV时,我们需要根据用户访问的时间,所以使用Event Time。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

接着,我们需要告知Flink记录的真实时间timestamp,以及触发window计算的watermark。在Flink中通过实现接口AssignerWithPeriodicWatermarks来完成。

考虑到数据可能乱序,选择BoundedOutOfOrdernessTimestampExtractor:

      long MAX_EVENT_DELAY = 3500;
      BoundedOutOfOrdernessTimestampExtractor<String> assigner = new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(MAX_EVENT_DELAY)) {
            @Override
            public long extractTimestamp(String element) {
                VisitEvent visitEvent = null;
                try {
                    visitEvent = objectMapper.readValue(element, VisitEvent.class);
                    return visitEvent.getVisitTime();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return Instant.now().toEpochMilli();
            }
        };

上面的一些代码主要用来做时间的处理,真实的计算通过window来完成,代码如下。

        int[] arr = {0,2};
        FlinkKafkaConsumerBase<String> consumerWithEventTime = myConsumer.assignTimestampsAndWatermarks(assigner);
        TypeInformation<Tuple3<String, VisitEvent, String>> typeInformation = TypeInformation.of(new TypeHint<Tuple3<String, VisitEvent, String>>() {});
        DataStreamSource<String> dataStreamByEventTime = env.addSource(consumerWithEventTime);
        SingleOutputStreamOperator<UrlVisitBy> uvCounter = dataStreamByEventTime
                .map(str->objectMapper.readValue(str,VisitEvent.class))
                .map(visitEvent-> new Tuple3<>(visitEvent.getVisitUrl(), visitEvent,visitEvent.getVisitUserId()))
                .returns(typeInformation)
                .keyBy(arr)
                .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1),Time.hours(-8)))
                .allowedLateness(Time.minutes(1))
                .process(new ProcessWindowFunction<Tuple3<String, VisitEvent, String>, UrlVisitBy, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple3<String, VisitEvent, String>> elements, Collector<UrlVisitBy> out) throws Exception {
                        long count = 0;
                        Tuple2<String,String> tuple2 = null;
                        if (tuple instanceof Tuple2){
                            tuple2 = (Tuple2) tuple;
                        }
                        for (Tuple3<String, VisitEvent, String> element : elements) {
                            count++;
                        };
                        TimeWindow window = context.window();
                        out.collect(new UrlVisitBy(window.getStart(),window.getEnd(),tuple2.f0,count,tuple2.f1));
                    }
                });
        uvCounter.print();

建议使用returns(typeInformation)

由于JDK默认的编译器在编译过程中会擦除泛型信息,这样Flink在执行的时候无法获取足够的信息来推断真实类型,那么可能会碰到这样的错误“The generic type parameters of 'XXX' are missing”。

现在只有Eclipse JDT compiler在编译后可以保留足够的信息,但是它限制了开发者只能使用Eclipse编译以及调试。另外由于兼容性问题,Eclipse对Flink的支持并不友好。官方推荐使用Intelij idea。

为了摆脱编译器的限制,Flink采用了TypeInfomation告知Flink真实类型。

展开阅读全文
打赏
1
11 收藏
分享
加载中
你好,后续这个怎么落库呢,redis?对于落库可能产生重复该如何解决呢(比如某台机器挂掉,数据写入一半就被停止,恢复后从上一个点重新计算落库)
2019/05/29 17:20
回复
举报
moyiguke博主

引用来自“idoz”的评论

这样写的话是不是会反序列化两次,一次是visitEvent.getVisitTime(); 另一次是: .map(str->objectMapper.readValue(str,VisitEvent.class))
是的,将DataStream先转化成DataStream会更好一些。
2018/12/03 16:03
回复
举报
这样写的话是不是会反序列化两次,一次是visitEvent.getVisitTime(); 另一次是: .map(str->objectMapper.readValue(str,VisitEvent.class))
2018/11/30 17:59
回复
举报
更多评论
打赏
3 评论
11 收藏
1
分享
返回顶部
顶部