文档章节

Apache Flink 零基础入门(十九)Flink windows和Time操作

Vincent-Duan
 Vincent-Duan
发布于 09/12 16:45
字数 2928
阅读 16
收藏 1

Time类型

在Flink中常用的Time类型:

  • 处理时间
  • 摄取时间
  • 事件时间

处理时间

是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间。例如一些算子操作时间,在服务器上面的时间。

如果你以处理时间作为流处理的时间处理方式,那么所有的基于时间的操作都会使用服务器的时间,来运行相关的操作。例如:一个小时的处理时间窗口,将会包含一个小时内的到达服务器内的所有数据。例如应用程序9:15am开始执行,第一个小时的时间处理窗口会包含所有的9:15到10:15内的事件数据,下一个时间窗口是10:15到11:15内的所有数据。

处理时间是最简单的事件处理方式,并不需要流和机器的时间协调。因此提供了高性能和低延迟。然而在分布式环境中或者异步环境中处理时间并不能够提供准确性(也就是说在处理数据时,由于网络的抖动在一个处理时间窗口中例如9:15到10:15,很大可能包括9:00的事件数据)。

事件时间

事件时间是每一个设备上每一个单独事件发生的时间例如手机登录APP的日志时间。这个时间就是这条数据记录的时间。每一条数据都有一个时间戳表示这条数据的事件发生时间。这个时间取决于每条数据,而并不会依赖于机器的时间。事件时间处理时必须指定如何获得Event Time watermarks(用来描述Event Time如何处理)。

按照事件时间处理数据,处理结果应该是完全一致,也就是说无论处理多少次结果都是一样的,这就是所谓的大数据处理的幂等性。 不管事件到达时间和事件是不是有序到达(在生产环境中,数据往往进入到服务器中的时间和顺序是不一定的,有可能先产生的数据后到达服务器,这取决于很多网络因素)

摄取时间

摄取时间表示某个事件数据进入到Flink的时间。在source操作中,每条记录都会得到source的当前时间戳,也就是接收到的数据自动会有一个摄取时间,也就是例如时间窗都是基于这个时间来处理的。

摄取时间是处于事件时间和处理时间之间。如上图所示。摄取时间是有成本的,但是却是结果可预测的。因为摄取时间使用了稳定的时间戳(在source端只会分配一次),每一条数据的时间戳都是固定的。并且同一摄取时间的数据有可能被分配到不同的处理时间窗口中。

Windows

Windows使我们处理无限数据流(源源不断的进来)的核心部件。Windows把我们的数据流拆成一个个的buckets。我们需要把算子作用到buckets上面去。

第一件事情就是需要指定我们的流数据是不是有key,有key和没有key对应的算子是完全不一样的。

Keyed windows

带keyby,会结合windows一起使用。输入的数据内容中的任意属性都可以作为一个key。在这个流上可以允许窗口多任务并行计算,每一个逻辑key都可以被独立计算,相同的key的数据会被发送到相同的并行任务中去处理。

Non-Keyed windows

通过使用windowAll来指定。原始的数据流不会被拆分成多个逻辑任务,所有窗口逻辑都是一个窗口任务来执行,所以并行度是1。

windows 生命周期

简而言之,当第一个元素到达对应的窗口时,一个windows就会被开始创建。当时间(不管是event时间还是processing时间)达到时间戳范围,就会移除窗口。另外,每一个窗口都有一个Trigger和window Functions,当数据到达窗口后,执行的函数就是window Functions,这个函数包含了对这个窗口内容的所有计算,当Trigger达到一定条件之后,就会触发。

Windows Assigners

在指定流数据是否带key之后,下一步就是定义窗口的分配器(windows assigner),windows assigner的职责是定义每一个传入的元素如何分配到窗口内。对于keyby使用window()方法,对于non-keyby使用windowAll()方法。

WindowAssigner is responsible for assigning each incoming element to one or more windows. 

 每个传入的数据分配给一个或多个窗口。

Flink内置的window assigner对于大多数场景来讲基本上是够用的(tumbling windows滚动窗口, sliding windows滑动窗口, session windows会话窗口 and global windows全局窗口)。也可以通过继承WindowAssigner来自定义一个window assigner。所有的内置window assigner(除了全局窗口)都是基于时间(处理时间或事件时间)来分配数据的。

基于时间的窗口有一个开始的timestamp(inclusive)和结束timestamp(exclusive)表示窗口的大小。

Flink中对于窗口的划分有两大类,第一大类是基于time(用的最多),第二大类是基于count。

Tumbling Windows 滚动窗口

滚动窗口分配器将分配每一个元素到一个指定大小的窗口,这种类型的窗口有一个固定的大小而且不会有重叠的。上面这张图就是随着时间流按照指定的时间间隔拆开。

简单实例代码:

Scala

object WindosApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.227.128", 9999)
    text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)

    env.execute("WindosApp")
  }

}

上面的代码表示监听socket数据流,每隔5秒获取一次数据。timeWindow表示根据时间来划分窗口,(此外还有countWindow根据数量来划分窗口)。默认时间是processTime处理时间。

Java

public class JavaWindowApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

        env.execute("JavaWindowApp");

    }
}

Sliding Windows滑动窗口

滑动窗口分配器分配每一个元素到一个固定大小的窗口,类似于滚动窗口,窗口大小可以通过配置进行修改,但是滑动窗口还有另外一个附加滑动参数控制滑动窗口什么时候启动,所以这个窗口是有可能重叠的。

上面图的意思是window1的窗口大小是10分钟,滑动大小是5分钟,也就是每隔5分钟产生一个窗口,这个窗口的大小是10分钟,这个窗口就是window2,然后window2又过5分钟产生一个窗口,窗口的大小是10分钟 window3,以此类推。所以滑动窗口处理的数据可能会有重叠。一个数据元素可能会在多个窗口中进行处理。

使用场景:每个半个小时统计前一个小时的TopN。

object WindosApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.227.128", 9999)
    text.flatMap(_.split(",")).map((_,1)).keyBy(0)
      //.timeWindow(Time.seconds(5)) # 滚动窗口
      .timeWindow(Time.seconds(10),Time.seconds(5))
      .sum(1).print().setParallelism(1)

    env.execute("WindosApp")
  }

}

每隔5秒统计近10秒的数据。所以当服务器端输入:

a,a,a,b,b,b
a,a,a,b,b,b
a,b,a,b,a,a

时,控制台会打印两遍结果:

(a,10)
(b,8)
(b,8)
(a,10)

Window Functions

在定义窗口分配器之后,就需要指定基于每一个窗口的计算方法了(在上面的例子中我们做了一个keyby sum操作)。window function会处理窗口中的每一个元素。window function包括如下几个:

  • ReduceFunction
  • AggregationFunction
  • FoldFunction
  • ProcessWindowFunction

ReduceFunction和AggregationFunction的执行效率更高,因为Flink会在数据到达每一个窗口时首先做一个增量聚合操作。ProcessWindowFunction拿到的是包含在窗口中的所有的元素以及附加信息一个Iterable,是一个全量聚合。因此ProcessWindowFunction的执行效率不高,因为Flink会缓存窗口中的所有数据。

ReduceFunction

input中的两个元素进行结合产生一个同样类型的输出。这里我们举例,通过传入的数据类型是数值类型来演示增量效果。

Scala

object WindowReduceApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.227.128", 9999)
    text.flatMap(_.split(","))
      .map(x=>(1,x.toInt)) // 1,2,3,4,5 => (1,1) (1,2) (1,3) (1,4) (1,5)
      .keyBy(0) //因为key都是1, 所以所有的元素都到一个task去执行
      .timeWindow(Time.seconds(5)) // 滚动窗口
      .reduce((v1, v2) => { //// reduce函数作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的数据到达之后进行一次性处理,而是数据两两处理
      println(v1 + "....." + v2)
      (v1._1, v1._2 + v2._2)
    })
      .print().setParallelism(1)
    env.execute("WindowReduceApp")
  }
}

服务器端输入:

1,2,3,4,5

控制台中输出如下:

(1,1).....(1,2)
(1,3).....(1,3)
(1,6).....(1,4)
(1,10).....(1,5)
(1,15)

reduce函数作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的数据到达之后进行一次性处理,而是数据两两处理。

Java

public class JavaWindowReduceApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                        System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]");
                        return new Tuple2<>(value1.f0,value1.f1 + value2.f1);
                    }
                })
                .print().setParallelism(1);

        env.execute("JavaWindowApp");

    }
}

输出结果如下:

value1 = [(1,1)], value2 = [(1,2)]
value1 = [(1,3)], value2 = [(1,3)]
value1 = [(1,6)], value2 = [(1,4)]
value1 = [(1,10)], value2 = [(1,5)]
(1,15)

ProcessWindowFunction

ProcessWindowFunction可以拿到一个Iterable,可以拿到窗口中的所有元素,并且有一个上下文对象可以访问时间和状态信息,比reducefunction可以提供更多的功能。但这样却可以带来资源和性能的开销,因为元素并不能通过增量的方式去聚合,相反,它需要把所有的数据都放在一个buffer中。

public class JavaWindowProcessApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length()>0){
                        out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5))
                .process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {
                        System.out.println("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]");
                        long count = 0;
                        for(Tuple2<Integer, Integer> in:elements) {
                            count++;
                        }
                        out.collect("window:" + context.window() + "count:" + count);
                    }
                })
                .print().setParallelism(1);

        env.execute("JavaWindowApp");

    }
}

服务器输入:

1,2,3,4,5

控制台输出:

tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1,1), (1,2), (1,3), (1,4), (1,5)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00]
window:TimeWindow{start=1568542160000, end=1568542165000}count:5

只输出一次,说明是等待所有数据都拿到之后才进行处理。

使用场景:窗口内的数据进行排序。在Reduce中是无法进行排序的。

© 著作权归作者所有

Vincent-Duan

Vincent-Duan

粉丝 32
博文 430
码字总数 237992
作品 0
海淀
私信 提问
Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程

TaskManager 处理 SubmitJob 的过程 https://t.zsxq.com/eu7mQZj 博客 1、Flink 从0到1学习 —— Apache Flink 介绍 2、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程...

zhisheng_t
09/03
19
0
Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程

流处理 WordCount 程序 https://t.zsxq.com/qnMFEUJ 博客 1、Flink 从0到1学习 —— Apache Flink 介绍 2、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3、F...

zhisheng_t
09/01
12
0
Flink 源码解析 —— Flink TaskManager 有什么作用?

TaskManager 有什么作用 https://t.zsxq.com/RZbu7yN 博客 1、Flink 从0到1学习 —— Apache Flink 介绍 2、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3、F...

zhisheng_t
09/01
36
0
《从0到1学习Flink》目录

1、《从0到1学习Flink》—— Apache Flink 介绍 2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3、《从0到1学习Flink》—— Flink 配置文件详解 4、《从0...

火力全開
03/18
224
0
Apache Flink 1.3.1 发布,通用数据处理平台

Apache Flink 1.3.1 发布了,这是 Apache Flink 1.3 系列的首个 bug 修复版本。该版本包含 50 个修复程序和对 Flink 1.3.0 的小改进。下面的列表包括所有修补程序的详细列表。建议用户升级至...

局长
2017/06/24
820
3

没有更多内容

加载失败,请刷新页面

加载更多

还为PDF转Word抓狂?以下神器让你在职场倍受欢迎!

身在职场的你,是否一直在琢磨:如何能让自己在公司更受欢迎?如何才能在办公室里混个好人缘?如何在同事圈里留个好印象?其实,想要让自己成为受欢迎的人,只要让自己成为大家需要的人不就行...

foxit2world
29分钟前
7
0
AndServer+Service打造Android服务器实现so文件调用

so 文件调用 随着 Android 移动安全的高速发展,不管是为了执行效率还是程序的安全性等,关键代码下沉 native 层已成为基本操作。 native 层的开发就是通指的 JNI/NDK 开发,通过 JNI 可以实...

夜幕NightTeam
31分钟前
4
0
Docker下kafka学习三部曲之二:本地环境搭建

在上一章《 Docker下kafka学习,三部曲之一:极速体验kafka》中我们快速体验了kafka的消息分发和订阅功能,但是对环境搭建的印象仅仅是执行了几个命令和脚本,本章我们通过实战来学习如何编写...

程序员欣宸
31分钟前
4
0
萌新推荐!不再为Excel转换PDF发愁,Aspose.Cells for .NET一步到位!

Aspose.Cells for .NET(点击下载)是Excel电子表格编程API,可加快电子表格管理和处理任务,支持构建具有生成,修改,转换,呈现和打印电子表格功能的跨平台应用程序。 将Excel工作簿转换为...

mnrssj
32分钟前
6
0
对于绘画小白怎么画制服?该注意什么?

怎样制作学生服装?想必绘画初学者们常常会想的问题吧,不知道怎样才能画好人物的衣服,别着急,今日就在这儿讲一些关于如何绘画学生衣服校服的教程给我们!期望能够帮到你们! 轻便西装是不...

热爱画画的我
37分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部