文档章节

Spark Streaming之窗口函数和状态转换函数

绝世武神
 绝世武神
发布于 2016/09/10 21:46
字数 1815
阅读 1371
收藏 0

流处理主要有3种应用场景:无状态操作、window操作、状态操作。

reduceByKeyAndWindow

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import org.apache.spark.{SparkContext, SparkConf}

object ClickStream {
  def main (args: Array[String]){
    // 屏蔽不必要的日志显示在终端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

     //创建SparkConf对象,设置应用程序的名称,在程序运行的监控界面可以看到名称
    val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]")   
    val sc = new SparkContext(conf)
    
    //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该Batch Interval的整数倍
    val ssc = new StreamingContext(sc, Seconds(args(0).toLong))

    //由于用到了窗口函数,需要复用前面的RDD,必须checkpoint,注意复用的RDD之间是没有任何关系的
    ssc.checkpoint(args(1))

    val topics = Set("clickstream")    //所要获取数据在kafka上的主题
    val brokers = "yz4211.hadoop.data.sina.com.cn:19092,10.39.4.210:19092,yz4209.hadoop.data.sina.com.cn:19092,yz4208.hadoop.data.sina.com.cn:19092,yz4207.hadoop.data.sina.com.cn:19092,yz4206.hadoop.data.sina.com.cn:19092,10.39.4.214:19092,10.39.4.213:19092,10.39.4.220:19092,10.39.4.219:19092,10.39.4.218:19092,10.39.4.217:19092,10.39.4.216:19092,10.39.4.215:19092,yz4205.hadoop.data.sina.com.cn:19092,yz4204.hadoop.data.sina.com.cn:19092,yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092"
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    //val offset = "largest"    //values: smallest, largest ,控制读取最新的数据,还是旧的数据, 默认值为largest

    //从Spark1.3开始,我们能够使用如下方式高效地从kafka上获取数据
    val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    val kvs = kvsTemp.map(line => line._2)                 //第一部分是null为key,第二部分才是所需数据,为string类型

    //根据需求对流进来的数据进行清洗、转换等处理
    val data = kvs.map(_.split("\\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\\?")(0)).filter(! _.contains("iframe")).map((_, 1))
    
    //滑动窗口长度为1小时,滑动间隔为10分钟,这会得到过去1小时内,url和pv的对应关系
    //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10))

     //滑动窗口长度为1小时,滑动间隔为10分钟,这同样会得到过去1小时内,url和pv的对应关系,只不过这是加新减旧,第一个参数加上新的,第2个参数,减去上一个batch的。和上一个版本的reduceByKeyAndWindow每次都会重新算相比(叠加方式),这种方式(增量方式)更加高效优雅
    val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10))   
    pvWindow.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminat
    ssc.stop(true, true)    //优雅地结束
  }
}

友情链接1

countByValueAndWindow

countByValueAndWindow的源码如下所示:

  /**
   * Return a new DStream in which each RDD contains the count of distinct elements in
   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
   * `numPartitions` not specified).
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   * @param numPartitions  number of partitions of each RDD in the new DStream.
   */
  def countByValueAndWindow(
      windowDuration: Duration,
      slideDuration: Duration,
      numPartitions: Int = ssc.sc.defaultParallelism)
      (implicit ord: Ordering[T] = null)
      : DStream[(T, Long)] = ssc.withScope {
    this.map((_, 1L)).reduceByKeyAndWindow(
      (x: Long, y: Long) => x + y,
      (x: Long, y: Long) => x - y,
      windowDuration,
      slideDuration,
      numPartitions,
      (x: (T, Long)) => x._2 != 0L
    )
  }

reduceByWindow

reduceByWindow的源码如下所示:

/**
   * Return a new DStream in which each RDD has a single element generated by reducing all
   * elements in a sliding window over this DStream. However, the reduction is done incrementally
   * using the old window's reduced value :
   *  1. reduce the new values that entered the window (e.g., adding new counts)
   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
   *  This is more efficient than reduceByWindow without "inverse reduce" function.
   *  However, it is applicable to only "invertible reduce functions".
   * @param reduceFunc associative and commutative reduce function
   * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
   *                      `invReduceFunc(reduceFunc(x, y), x) = y`
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def reduceByWindow(
      reduceFunc: (T, T) => T,
      invReduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
      this.map((1, _))
          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
          .map(_._2)
  }

countByWindow

countByWindow的源码如下所示:

  /**
   * Return a new DStream in which each RDD has a single element generated by counting the number
   * of elements in a sliding window over this DStream. Hash partitioning is used to generate
   * the RDDs with Spark's default number of partitions.
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }

由此可见,countByValueAndWindow、reduceByWindow、countByWindow的底层实现都是“加新减旧”版本的reduceByKeyAndWindow。

上面,求出了每一小时窗口内的Url和Pv的对应关系,如果想求出相同的Url在上一个窗口的Pv和本次窗口的Pv的比值,那么这时侯updateStateByKey,mapWithState就粉墨登场了。由于updateStateByKey和mapWithState二者之间有10倍左右的性能差异。这里,只涉及mapWithState。

mapWithState

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import org.apache.spark.{SparkContext, SparkConf}

object ClickStream {
  def main (args: Array[String]){
    // 屏蔽不必要的日志显示在终端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

     //创建SparkConf对象,设置应用程序的名称,在程序运行的监控界面可以看到名称
    val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]")   
    val sc = new SparkContext(conf)
    
    //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该Batch Interval的整数倍
    val ssc = new StreamingContext(sc, Seconds(args(0).toLong))

    //由于用到了窗口函数,需要复用前面的RDD,必须checkpoint,注意复用的RDD之间是没有任何关系的
    ssc.checkpoint(args(1))

    val topics = Set("clickstream")    //所要获取数据在kafka上的主题
    val brokers = "yz4211.hadoop.data.sina.com.cn:19092,10.39.4.210:19092,yz4209.hadoop.data.sina.com.cn:19092,yz4208.hadoop.data.sina.com.cn:19092,yz4207.hadoop.data.sina.com.cn:19092,yz4206.hadoop.data.sina.com.cn:19092,10.39.4.214:19092,10.39.4.213:19092,10.39.4.220:19092,10.39.4.219:19092,10.39.4.218:19092,10.39.4.217:19092,10.39.4.216:19092,10.39.4.215:19092,yz4205.hadoop.data.sina.com.cn:19092,yz4204.hadoop.data.sina.com.cn:19092,yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092"
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    //val offset = "largest"    //values: smallest, largest ,控制读取最新的数据,还是旧的数据, 默认值为largest

    //从Spark1.3开始,我们能够使用如下方式高效地从kafka上获取数据
    val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    val kvs = kvsTemp.map(line => line._2)                 //第一部分是null为key,第二部分才是所需数据,为string类型

    //根据需求对流进来的数据进行清洗、转换等处理
    val data = kvs.map(_.split("\\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\\?")(0)).filter(! _.contains("iframe")).map((_, 1))
    
    //滑动窗口长度为1小时,滑动间隔为10分钟,这会得到过去1小时内,url和pv的对应关系
    //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10))

     //滑动窗口长度为1小时,滑动间隔为10分钟,这同样会得到过去1小时内,url和pv的对应关系,只不过这是加新减旧,第一个参数加上新的,第2个参数,减去上一个batch的。和上一个版本的reduceByKeyAndWindow每次都会重新算相比(叠加方式),这种方式(增量方式)更加高效优雅
    val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10))

    //key是K, value是新值,state是原始值(本batch之前的状态值)。这里你需要把state更新为新值
    val mappingFunc = (key: String, value: Option[Int], state: State[Int]) => {
        val currentPV = value.getOrElse(0)
        val output = (key, currentPV, state.getOption().getOrElse(0))
        state.update(currentPV)
        output
      }

    //StateSpec只是一个包裹,实际操作仍然是定义的mappingFunc函数
    val urlPvs = pvWindow.mapWithState(StateSpec.function(mappingFunc))    //url,当前batch的PV,上一个batch的PV
    urlPvs.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminat
    ssc.stop(true, true)    //优雅地结束
  }
}

友情链接1

友情链接2

© 著作权归作者所有

绝世武神
粉丝 20
博文 33
码字总数 48343
作品 0
海淀
程序员
私信 提问
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
2018/07/26
0
0
地铁译:Spark for python developers ---Spark流式数据处理

先研究一下不断改变的动态环境带来的挑战,在列出流处理应用的先决条件(如,Twitter的TCP Sockets连接)之后, 结合Spark, Kafka 和 Flume 把数据放入一个低延迟,高吞吐量,可缩放的处理流...

abel_cao
01/17
0
0
Spark2.40 Streaming编程指南

Spark Streaming编程指南 概观 一个快速的例子 基本概念 链接 初始化StreamingContext 离散流(DStreams) 输入DStreams和Receivers DStreams的转换 DStreams的输出操作 DataFrame和SQL操作 ...

天天爬网
2018/11/21
22
0
Spark Structured Streaming特性介绍

为帮助开发者更深入的了解这三个大数据开源技术及其实际应用场景,9月8日,InfoQ联合华为云举办了一场实时大数据Meetup,集结了来自Databricks、华为及美团点评的大咖级嘉宾前来分享。 作为S...

Spark
2018/09/21
0
0
Apache Spark 2.4 正式发布,重要功能详细介绍

美国时间 2018年11月08日 正式发布了。一如既往,为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.4 带来了许多新功能,如下: 添加一种支持屏障模式(barrier mode)的调度器,以便...

Spark
2018/11/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
687
10
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
39
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
25
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
74
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
72
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部