文档章节

Structured Streaming教程(1) —— 基本概念与使用

董黎明
 董黎明
发布于 12/07 11:28
字数 1199
阅读 4
收藏 0

近年来,大数据的计算引擎越来越受到关注,spark作为最受欢迎的大数据计算框架,也在不断的学习和完善中。在Spark2.x中,新开放了一个基于DataFrame的无下限的流式处理组件——Structured Streaming,它也是本系列的主角,废话不多说,进入正题吧!

很多初学者,对大数据的概念都是模糊不清的,大数据是什么,能做什么,学的时候,该按照什么线路去学习,学完往哪方面发展,想深入了解,想学习的同学欢迎加入大数据学习qq群:199427210,有大量干货(零基础以及进阶的经典实战)分享给大家,并且有清华大学毕业的资深大数据讲师给大家免费授课,给大家分享目前国内最完整的大数据高端实战实用学习流程体系

简单介绍

在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设计上。

在过去使用streaming时,我们很容易的理解为一次处理是当前batch的所有数据,只要针对这波数据进行各种处理即可。如果要做一些类似pv uv的统计,那就得借助有状态的state的DStream,或者借助一些分布式缓存系统,如Redis、Alluxio都能实现。需要关注的就是尽量快速的处理完当前的batch数据,以及7*24小时的运行即可。

可以看到想要去做一些类似Group by的操作,Streaming是非常不便的。Structured Streaming则完美的解决了这个问题。

在Structured Streaming中,把源源不断到来的数据通过固定的模式“追加”或者“更新”到了上面无下限的DataFrame中。剩余的工作则跟普通的DataFrame一样,可以去map、filter,也可以去groupby().count()。甚至还可以把流处理的dataframe跟其他的“静态”DataFrame进行join。另外,还提供了基于window时间的流式处理。总之,Structured Streaming提供了快速、可扩展、高可用、高可靠的流式处理。

小栗子

在大数据开发中,Word Count就是基本的演示示例,所以这里也模仿官网的例子,做一下演示。

直接看一下完整的例子:

package xingoo.sstreaming

import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]): Unit = {


    val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._
    // 创建DataFrame
    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    // 三种模式:
    // 1 complete 所有内容都输出
    // 2 append   新增的行才输出
    // 3 update   更新的行才输出
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

效果就是在控制台输入nc -lk 9999,然后输入一大堆的字符,控制台就输出了对应的结果:

然后来详细看一下代码:

val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

上面就不用太多解释了吧,创建一个本地的sparkSession,设置日志的级别为WARN,要不控制台太乱。然后引入spark sql必要的方法(如果没有import spark.implicits._,基本类型是无法直接转化成DataFrame的)。

val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

创建了一个Socket连接的DataStream,并通过load()方法获取当前批次的DataFrame。

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

先把DataFrame转成单列的DataSet,然后通过空格切分每一行,再根据value做groupby,并统计个数。

val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

调用DataFrame的writeStream方法,转换成输出流,设置模式为"complete",指定输出对象为控制台"console",然后调用start()方法启动计算。并返回queryStreaming,进行控制。

这里的outputmode和format都会后续详细介绍。

query.awaitTermination()

通过QueryStreaming的对象,调用awaitTermination阻塞主线程。程序就可以不断循环调用了。

观察一下Spark UI,可以发现程序稳定的在运行~

总结

这就是一个最基本的wordcount的例子,想象一下,如果没有Structured Streaming,想要统计全局的wordcount,还是很费劲的(即便使用streaming的state,其实也不是那么好用的)。

© 著作权归作者所有

共有 人打赏支持
董黎明
粉丝 17
博文 128
码字总数 305406
作品 0
深圳
私信 提问
Structured Streaming教程(2) —— 常用输入与输出

上篇了解了一些基本的Structured Streaming的概念,知道了Structured Streaming其实是一个无下界的无限递增的DataFrame。基于这个DataFrame,我们可以做一些基本的select、map、filter操作,...

董黎明
12/08
0
0
Spark的那些事(一)

Spark是一个快速的集群化的实时计算系统。支持Java, Scala, Python 和R语言的高级API。 一 Spark生态: 1111.png 支持Spark Sql用于sql和结构化数据查询处理;支持MLlib用于机器学习;支持G...

假文艺的真码农
01/02
0
0
Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本。就在前一个月,我们才从0.9升级到...

董黎明
12/09
0
0
Structured Streaming中的Join

Structured Streaming支持流式Dataset/DataFrame和静态Dataset/DataFrame的Join,也支持和另一个流式Dataset/DataFrame的Join。Join的结果是增量生成的,类似于流的聚合。这节中我们将会探索...

阿猫阿狗Hakuna
11/30
0
0
Spark 2.0 技术预览版: Easier, Faster, and Smarter

For the past few months, we have been busy working on the next major release of the big data open source software we love: Apache Spark 2.0. Since Spark 1.0 came out two years a......

sunbow0
2016/05/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring异常之Druid – unregister mbean error

Spring异常之Druid – unregister mbean error 2017年04月19日 12:13:42 Dr.Zhu 阅读数:6688 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zt_fucker/arti...

linjin200
15分钟前
1
0
微信小程序webview问题

今天在改小程序的时候在使用webview的时候切换webview的地址行为,出现了诡异的情况。 默认querystring里会有多个?符号,使用的时候被微信给截取了,导致程序找不到改页面。 而且querystri...

钟元OSS
19分钟前
1
0
Spark2.0操作Hbase

读写Hbase的方法,这里是通过Spark的RDD来操作的方法,通过Hbase API的方式是另一种,这里不涉及。 首先配置pom,添加hbase依赖,一般Spark平台不包含hbase的jar包,所以这些依赖不添加<scop...

守望者之父
19分钟前
1
0
【转】你会用哪些JavaScript循环遍历

总结JavaScript中的循环遍历定义一个数组和对象 const arr = ['a', 'b', 'c', 'd', 'e', 'f'];const obj = {a: 1,b: 2,c: 3,d: 4} for() 经常用来遍历数组元素 遍历值为数组元素...

kaixin_code
22分钟前
1
0
mysql的锁

MySQL的锁 全局锁:对数据库实例加锁 MySQL提供了一个加全局读锁的方法:Flush tables with read lock(FTWRL) 使用场景:做全库逻辑备份。 官方自带的逻辑备份工具mysqldump,使用时带上参数...

灯下草虫鸣_
25分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部