文档章节

Spark Structured Steaming 聚合、watermark 以及 window操作,结合输出模式的研究总结

ivan-Zhao
 ivan-Zhao
发布于 05/08 00:28
字数 2291
阅读 11
收藏 0

![TOC]

一、 背景

目前实时数仓需要对多张表进行关联聚合等复杂操作, 为了深度理解 Spark Structured Streaming 中聚合、输出模式(complete、append、update)、窗口操作(window)以及水印(watermark)相关操作;并能让团队熟练在实时数仓中使用聚合和窗口相关的操作,现做出以下总结。文中以基本的 wordcount 为例。以聚合为基准,与输出模式、窗口以及水印结合交叉。streaming 数据源为 socket。

关于Structured Streaming 详细介绍,可以查看Spark Structured Streaming官网,这里只针对聚合、窗口以及水印相关操作举例介绍。

文中纯属个人理解,不免有不当之处,万望给予指正,不胜感激。

二、 聚合

聚合只是纯粹的聚合操作,无窗口和水印操作。

1. complete 模式

核心代码如下:

 import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // 数据行转 word
    val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")
    val wordCounts = words.groupBy(
      $"word"
    ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
    // 开启 query,将数据输出到控制台
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", false)
      .start()
    query.awaitTermination()

运行结果

可以看出,complete 模式下,会把所有的数据都输出,也就意味着,streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。

2. update 模式

核心代码

val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .option("truncate", false)
      .start()

运行结果

可以看出,update 模式下, 只会将新增以及状态改变的数据输出,同样streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。

3. append 模式

聚合状态下,不使用水印操作,无法使用该模式。

三、聚合+窗口

窗口大小统一为5分钟,每三分钟滑动一次。

1. complete 模式

核心代码

import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // 数据行转 word
    val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")
      .withColumn("timestamp", current_timestamp())//时间戳
			.withColumn("cn", lit(1))
    val wordCounts = words.groupBy(
      window($"timestamp", "5 minutes", "3 minutes"),
      $"word"
    ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
    // 开启 query,将数据输出到控制台
    val query = wordCounts.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
    query.awaitTermination()

运行结果

complete 模式下,会对所有窗口数据输出,只会对自己窗口内的事件做聚合,如果某个窗口没有到达事件,输出忽略该窗口。

2. update 模式

update 模式下,只会输出当前事件落到的窗口,并且是新增或者更新的 key 才会被输出,其余窗口不被输出。

运行结果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|1  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|world|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|hello|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|world|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|3  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|cat  |1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|4  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|sprk |1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|sprk |1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|5  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|2  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|spark|1  |
+------------------------------------------+-----+---+

3. append 模式

没有添加watermark,不支持。

三、聚合+窗口+水印

首先说下 watermark 水印是什么,watermark 是为了针对处理迟到数据的机制,所谓迟到,举个例子,12:00这个时刻该到的数据,在12:30到达,该如何处理?watermark 提供了一个时间阈值,简单的理解可以说是等待一个 watermark 时间,在"此时间"前的将被抛弃不处理,在"此时间"后的将被正常处理。在 window 操作中,是以当前最大事件时间为基础,减去 watermark 阈值得到"此时间",即每次窗口滑动都会重新计算。这里不做详细介绍,详细介绍请查看 Spark 官网,有详细的图表介绍:Handle late data and watermark

窗口大小统一为5分钟,每三分钟滑动一次,水印长度三分钟。

具体代码,不同模式修改 outputMode 即可。

 		import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val words = lines.as[String]
      .selectExpr("value")
      .map(line => {
        val arr = line.mkString.split(",")
        WordBean(arr(0), DateUtils.parseSqlTimestamp(arr(1)))
      })
      .withColumn("cn", lit(1))
      .withWatermark("timestamp", "3 minutes")
      .groupBy(
        window($"timestamp", "5 minutes", "3 minutes"),
        $"word"
      )
      .agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
      val query = words.writeStream
            .outputMode("update")
            .format("console")
            .option("truncate", false)
            .start()
      query.awaitTermination()

		case class WordBean(word: String, timestamp: Timestamp)

1. complete 模式

complete 模式下输出所有窗口,watermark 无效。省略演示。

2. update 模式

在update 模式下,过期数据将被清除。

运行结果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|1  |
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|spark|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:12:00, 2019-04-19 09:17:00]|spark|1  |
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 10
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+
-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|3  |
+------------------------------------------+-----+---+

update 模式下,基本和 window 下输出结果一样,值得注意的是水印产生的效果,观察结果,batch 6时我们试图更新[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的值,但是得到确实空的输出,这是因为我们在这之前的 batch 将事件最大值更新到了2019-04-19 09:08:00,水印长度三分钟,此时[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的状态已经被清除,将得不到更新;同样在 batch10时,我们想更新[2019-04-19 09:03:00, 2019-04-19 09:08:00]的值,也无法实现;尤其观察batch 11 最后一条数据"world,2019-04-19 09:07:00",数据可以落在[2019-04-19 09:06:00, 2019-04-19 09:11:00]和[2019-04-19 09:03:00, 2019-04-19 09:08:00],但是结果只输出了[2019-04-19 09:06:00, 2019-04-19 09:11:00]窗口的数据。

在update 模式下,过期数据将被清除。

3. append 模式

append 模式应该最值得考究的模式了,初用时可能会觉得神奇,为什么没有数据?明明 source 端发了很多批次,就是看不到数据,尤其是程序中窗口和水印长度都设置很长的话,可能不得不怀疑是不是自己代码写错了?官网案例写错了?

实则不然,update 模式下的机制是:必须在确认某个窗口不在更新时才会输出该窗口,即过了水印长度时间。所以设置了数小时窗口的就耐心等待结果吧,如果后续没有新的事件,那么你可以关机睡觉觉了,因为你永远也看不到你想看到的结果。下面做演示。

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 08:57:00, 2019-04-19 09:02:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|spark|1  |
+------------------------------------------+-----+---+

注意,在滑动窗口跟新水印时,即便是确定前面窗口不再更新也不会立即输出,需要等到下一个 batch 触发,才会安全的输出,上述输出中[2019-04-19 08:57:00, 2019-04-19 09:02:00]|,输入的前三个 batch,窗口滑动了两次,在第二个窗口输入两个 batch,[2019-04-19 08:57:00, 2019-04-19 09:02:00]|的数据才被输出;

另外,窗口一旦被输出,将会被清理,后续不会被更新。

四、总结

以上,即是聚合、窗口、水印以及各个输出模式结合过程。

© 著作权归作者所有

ivan-Zhao
粉丝 10
博文 33
码字总数 29110
作品 0
深圳
程序员
私信 提问
Structured Streaming:Apache Spark中处理实时数据的声明式API

引言 随着实时数据的日渐普及,企业需要流式计算系统满足可扩展、易用以及易整合进业务系统。Structured Streaming是一个高度抽象的API基于Spark Streaming的经验。Structured Streaming在两...

阿猫阿狗Hakuna
2018/08/30
0
0
Spark VS Flink---流处理中的Time&Window

在流处理系统中,通常使用基于ProcessTime ,EventTime,Ingestion Time的消息处理模式。 相关含义可参考Flink 对于流消息的时间介绍 现实中的流消息延迟是必然的: 理想状态下,消息从产生要...

WestC
04/06
0
0
现代流式计算的基石:Google DataFlow

0. 引言 今天这篇继续讲流式计算。毫无疑问,Apache Flink 和 Apache Spark (Structured Streaming)现在是实时流计算领域的两个最火热的话题了。那么为什么要介绍 Google Dataflow 呢?Str...

尼不要逗了
01/24
0
0
Structured Streaming VS Flink

Flink是标准的实时处理引擎,而且Spark的两个模块Spark Streaming和Structured Streaming都是基于微批处理的,不过现在Spark Streaming已经非常稳定基本都没有更新了,然后重点移到spark sql...

雷飙
01/20
0
0
干货 | Spark Streaming 和 Flink 详细对比

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇...

xiaomin0322
05/29
53
0

没有更多内容

加载失败,请刷新页面

加载更多

centos 查看删除旧内核

1、查看系统中安装的内核 $ yum list installed | grep kernel 2、删除系统中旧内核 $ yum install yum-utils$ package-cleanup --oldkernels --count=2...

编程老陆
8分钟前
6
0
ES6

ES6:不改变原理的基础上,让API变得更简单 一、let:代替var用于声明变量 1、var的缺点: (1)声明提前 (2)没有块级作用域 2、let的优点: (1)组织了申明提前 (2)让let所在的块({}),...

wytao1995
今天
3
0
kubernetes 环境搭建 —— minikube

创建集群 minikube start 搭建好 k8s 集群后,可以查看集群的状态以及部署应用。主要用到的是 k8s 的 api,这通常需借助于 kutectl 命令行工具 基本操作 kubectl versionkubectl cluster-i...

lemos
今天
9
0
关于js混淆与反混淆还原操作

使用js的混淆加密,其目的是为了保护我们的前端代码逻辑,对应一些搞技术吃饭的公司来说,为了防止被竞争对手抓取或使用自己的代码,就会考虑如何加密,或者混淆js来达到代码保护。 1、为什么...

开源oschina
今天
11
0
用盛金公式解三次方程(ansi c版)

/* cc cubic.c -lm gcc cubic.c -lm Shengjin's Formulas Univariate cubic equation aX ^ 3 + bX ^ 2 + cX + d = 0, (a, b, c, d < R, and a!= 0). Multiple root disc......

wangxuwei
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部