Spark Structured Steaming 聚合、watermark 以及 window操作,结合输出模式的研究总结

原创
2019/05/08 00:28
阅读数 869

![TOC]

一、 背景

目前实时数仓需要对多张表进行关联聚合等复杂操作, 为了深度理解 Spark Structured Streaming 中聚合、输出模式(complete、append、update)、窗口操作(window)以及水印(watermark)相关操作;并能让团队熟练在实时数仓中使用聚合和窗口相关的操作,现做出以下总结。文中以基本的 wordcount 为例。以聚合为基准,与输出模式、窗口以及水印结合交叉。streaming 数据源为 socket。

关于Structured Streaming 详细介绍,可以查看Spark Structured Streaming官网,这里只针对聚合、窗口以及水印相关操作举例介绍。

文中纯属个人理解,不免有不当之处,万望给予指正,不胜感激。

二、 聚合

聚合只是纯粹的聚合操作,无窗口和水印操作。

1. complete 模式

核心代码如下:

 import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // 数据行转 word
    val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")
    val wordCounts = words.groupBy(
      $"word"
    ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
    // 开启 query,将数据输出到控制台
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", false)
      .start()
    query.awaitTermination()

运行结果

可以看出,complete 模式下,会把所有的数据都输出,也就意味着,streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。

2. update 模式

核心代码

val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .option("truncate", false)
      .start()

运行结果

可以看出,update 模式下, 只会将新增以及状态改变的数据输出,同样streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。

3. append 模式

聚合状态下,不使用水印操作,无法使用该模式。

三、聚合+窗口

窗口大小统一为5分钟,每三分钟滑动一次。

1. complete 模式

核心代码

import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // 数据行转 word
    val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")
      .withColumn("timestamp", current_timestamp())//时间戳
			.withColumn("cn", lit(1))
    val wordCounts = words.groupBy(
      window($"timestamp", "5 minutes", "3 minutes"),
      $"word"
    ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
    // 开启 query,将数据输出到控制台
    val query = wordCounts.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
    query.awaitTermination()

运行结果

complete 模式下,会对所有窗口数据输出,只会对自己窗口内的事件做聚合,如果某个窗口没有到达事件,输出忽略该窗口。

2. update 模式

update 模式下,只会输出当前事件落到的窗口,并且是新增或者更新的 key 才会被输出,其余窗口不被输出。

运行结果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|1  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|world|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|hello|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|world|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|3  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|cat  |1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|4  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|sprk |1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|sprk |1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|5  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|2  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|spark|1  |
+------------------------------------------+-----+---+

3. append 模式

没有添加watermark,不支持。

三、聚合+窗口+水印

首先说下 watermark 水印是什么,watermark 是为了针对处理迟到数据的机制,所谓迟到,举个例子,12:00这个时刻该到的数据,在12:30到达,该如何处理?watermark 提供了一个时间阈值,简单的理解可以说是等待一个 watermark 时间,在"此时间"前的将被抛弃不处理,在"此时间"后的将被正常处理。在 window 操作中,是以当前最大事件时间为基础,减去 watermark 阈值得到"此时间",即每次窗口滑动都会重新计算。这里不做详细介绍,详细介绍请查看 Spark 官网,有详细的图表介绍:Handle late data and watermark

窗口大小统一为5分钟,每三分钟滑动一次,水印长度三分钟。

具体代码,不同模式修改 outputMode 即可。

 		import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val words = lines.as[String]
      .selectExpr("value")
      .map(line => {
        val arr = line.mkString.split(",")
        WordBean(arr(0), DateUtils.parseSqlTimestamp(arr(1)))
      })
      .withColumn("cn", lit(1))
      .withWatermark("timestamp", "3 minutes")
      .groupBy(
        window($"timestamp", "5 minutes", "3 minutes"),
        $"word"
      )
      .agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
      val query = words.writeStream
            .outputMode("update")
            .format("console")
            .option("truncate", false)
            .start()
      query.awaitTermination()

		case class WordBean(word: String, timestamp: Timestamp)

1. complete 模式

complete 模式下输出所有窗口,watermark 无效。省略演示。

2. update 模式

在update 模式下,过期数据将被清除。

运行结果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|1  |
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|spark|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:12:00, 2019-04-19 09:17:00]|spark|1  |
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 10
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+
-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|3  |
+------------------------------------------+-----+---+

update 模式下,基本和 window 下输出结果一样,值得注意的是水印产生的效果,观察结果,batch 6时我们试图更新[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的值,但是得到确实空的输出,这是因为我们在这之前的 batch 将事件最大值更新到了2019-04-19 09:08:00,水印长度三分钟,此时[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的状态已经被清除,将得不到更新;同样在 batch10时,我们想更新[2019-04-19 09:03:00, 2019-04-19 09:08:00]的值,也无法实现;尤其观察batch 11 最后一条数据"world,2019-04-19 09:07:00",数据可以落在[2019-04-19 09:06:00, 2019-04-19 09:11:00]和[2019-04-19 09:03:00, 2019-04-19 09:08:00],但是结果只输出了[2019-04-19 09:06:00, 2019-04-19 09:11:00]窗口的数据。

在update 模式下,过期数据将被清除。

3. append 模式

append 模式应该最值得考究的模式了,初用时可能会觉得神奇,为什么没有数据?明明 source 端发了很多批次,就是看不到数据,尤其是程序中窗口和水印长度都设置很长的话,可能不得不怀疑是不是自己代码写错了?官网案例写错了?

实则不然,update 模式下的机制是:必须在确认某个窗口不在更新时才会输出该窗口,即过了水印长度时间。所以设置了数小时窗口的就耐心等待结果吧,如果后续没有新的事件,那么你可以关机睡觉觉了,因为你永远也看不到你想看到的结果。下面做演示。

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 08:57:00, 2019-04-19 09:02:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|spark|1  |
+------------------------------------------+-----+---+

注意,在滑动窗口跟新水印时,即便是确定前面窗口不再更新也不会立即输出,需要等到下一个 batch 触发,才会安全的输出,上述输出中[2019-04-19 08:57:00, 2019-04-19 09:02:00]|,输入的前三个 batch,窗口滑动了两次,在第二个窗口输入两个 batch,[2019-04-19 08:57:00, 2019-04-19 09:02:00]|的数据才被输出;

另外,窗口一旦被输出,将会被清理,后续不会被更新。

四、总结

以上,即是聚合、窗口、水印以及各个输出模式结合过程。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部