Spark-Streaming 官方文档

原创
2017/03/02 09:31
阅读数 2.4K

概述

  我们先来看看Spark官方文档对于Spark Streaming的定义:Spark Streaming是对Spark核心API的扩展,并且是一个具有可伸缩,高吞吐,容错特性的实时数据流处理框架。它支持多种数据源作为数据,比如Kafka,Flume,Kinesis或者TCP sockets,并且可以使用RDD高等函数,比如map, reduce, join和window,来实现复杂的数据处理算法。最终,处理后的数据可以输出到文件系统。数据库或者实时图表中。实际上,你还可以使用Spark的机器学习包和图处理包来对数据进行处理。   事实上在Spark Streaming内部是这样工作的。Spark Streaming接收实时流数据,然后把数据切分成一个一个的数据分片。最后每个数据分片都会通过Spark引擎的处理生成最终的数据文件。   Spark Streaming提供了一个高等级的抽象,名为discretized stream或DStream,来表示一个连续的数据流。DStrem可以从一个输入流数据源创建,比如Kafka,Flume,Kinesis,或者通过对其他DStream应用一些高等操作来获得。实际上在Spark内部DStream就是一系列的RDD分片。

快速开始

  在真正开始接触Spark Streaming程序细节之前,我们先看一看一个Spark Streaming的简单例子长成什么样子,我们需要统计一下文本中单词的词频,数据来源为TCP Socket。接下来,我们看一下吧。

  首先我们引入了Spark Stream执行过程中需要用到的类和一些来自于StreamingContext的隐式装换。其中StreamingContext是Spark中所有流处理功能的入口点,我们使用两个本地线程,同时定义了数据分片时间间隔为1秒。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

  通过这个StreamingContext,我们可以从一个TCP数据源接收流式数据,在这里我们需要指定主机和端口。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

  这个名为lines的DStream对象从数据服务器接收数据,DStream中德每一条数据都是一行文本,接下来我们使用空格来对数据进行分割,形成一个一个的单词。

// Split each line into words
val words = lines.flatMap(_.split(" "))

  我们应用的flatMap这个DStream操作会把每一行数据切分成一个一个的单词,然后把所有DStream中所有行切分成的单词形成一个新的words DStream。接下来我们要对words进行词频统计。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

  这个words DStream通过一个map(一对一)操作生成一个新的(word, 1) DStream,接下来通过reduce方法,我们可以得到每个数据分片的词频数据,然后通过wordCounts.print()方法打印出来。

  注意,这个时候Spark Stream并没有启动,我们只是定义了DStream数据源以及要对DStream做什么操作。想要启动Spark Stream,我们需要执行StreamingContext的start方法。

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

  这个例子的完整代码你可以从SparkStreaming例子NetworkWordCount获取。

  下面我们来运行一下这个例子,保证你已经成功部署Spark环境。首先我们启动netcat向端口发送数据。

$ nc -lk 9999

  接下来启动NetworkWordCount实例,在Spark的根目录下运行下面命令。

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

  流处理程序接受9999端口发送的数据,每秒形成一个数据分片,然后进行处理,并打印。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

基本概念

  接下来我们通过一个简单的例子来描述Spark Streaming的一些基本概念。

引入

  我们可以通过maven或者sbt来管理Spark Streaming的依赖包。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"

  如果你使用Kafka,Flume和Kinesis作为你的数据源,你必须引入相对应的依赖包park-streaming-xyz_2.11,因为Spark Streaming的核心API并不支持这些数据源。

数据源 依赖包
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

初始化StreamingContext

我们可以通过SparkConf对象来初始化一个StreamingContext的实例:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

其中appName是你的应用名称,这个名字会显示在Spark的Web UI界面上。而master则是Spark,Mesos或者Yarn集群的URL地址,当然你也可以使用"local[*]"来启动本地模式运行。不过使用集群方式运行的话,我们一般不推荐使用setMaster方法来把设置写死在代码中,而是在spark-submit的时候使用--master参数来动态指定。但是在本地调试的时候,可以直接使用这种方式。(注意:我们在创建StreamingContext实例的时候,会自动创建一个SparkContext,我们可以使用ssc.sparkContext来访问)。

批次间隔时间要根据应用对延迟的需求和集群的资源来配置(在性能调优部分我们会详细介绍)。

我们当然也可以使用SparkContext来创建一个StreamingContext。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

在context被初始化后,你还需要做如下几点:

  1. 通过input DStream来定义输入源
  2. 通过DStream的转换操作和输出操作来定义流数据处理规则
  3. 开始接受数据并处理:streamingContext.start()
  4. 等待流处理终止(或者出现异常退出):streamingContext.awaitTermination()
  5. 手动终止流处理:streamingContext.stop()

需要注意的点:

  • 一旦context被启动,任何新的流计算逻辑都无法被加入和设置
  • 一旦context被终止,它就不能重新启动了
  • 在同一时间,同一JVM中只能创建一个StreamingContext实例
  • 调用StreamingContext的stop方法会同时终止SparkContext,想要不终止SparkContext,你需要给stop方法设置一个参数来禁止SparkContext的终止。
  • 一个SparkContext可以创建多个StreamingContext,当然你需要在前一个StreamingContext终止之后再创建新的StreamingContext(当然你需要防止在这个过程中SparkContext被终止)。

Discretized Streams (DStreams)

Discretized Stream或者DStream是StreamingContext提供的最基本的抽象,它代表了一系列连续的数据片,包括从数据源哪里接收到的数据和通过各种转换操作得到的输出数据。在Spark内部,DStream就是一系列连续的RDD(弹性分布式数据集)。每一个DStream中的RDD包含了明确的时间间隔内的数据,如下图所示。 每一个我们定义在DStream上面的操作都是作用在底层的RDD上面。比如我们前面例子中的词频统计操作,flatMap操作作用在了lines DStream中德每一个RDD上面并生成了一个words DStream。这个过程正如下图所示。 这些底层RDD上面的操作,都会被Spark引擎所处理。而DStream操作则隐藏了大多数的细节,并提供给我们一个非常好用的高层次的API,对于DStream支持的操作,我们会在下一节进行讲解。

Input DStreams and Receivers

Input DStream是一个从流数据源接受流数据的DStream。在快速开始的例子中,lines就是一个Input DStream,它从netcat server接受流数据。任何一个Input DStream(除了file stream,后面我们会讲到) 都会关联一个Receiver对象,这个对象负责从流数据源接收流数据然后放到内存中等待处理。

Spark Streaming提供了两种类型的流数据源:

  • 基本数据源:由StreamingContext API直接提供的数据源,比如file stream和socket connections
  • 高级数据源:比如Kafka,Flame,Kinesis等数据源,这些需要额外的工具类库支持。在引入这一节中,我们提到过这些工具类。

下面的章节中,我们会依次对这些数据源进行说明。

注意,如果你想要在你的流处理程序中启用多个不同的数据源,那么你只需要创建多个Input DStream。这样就会有多个Receiver来同时接收不同的流数据。需要注意的是,Spark的work/executor是一个长时间运行的应用。因此,一定要记住一个Spark Streaming应用程序需要分配足够的核心来处理接收的数据,以及运行接收器。

要记住的点:

  • 我们在本地运行一个Spark Streaming应用程序千万不要使用"local"或者"local[1]"作为master URL。这一位置只有一个线程用来运行本地程序,如果你使用了一个带有Receiver的Input DStream(比如sockets,Kafka,Flame等),这个唯一的线程就会用来接收流数据,那么也就没有多余的线程来出来这些接收的数据了。因此,在本地运行的时候要使用"local[n]"(n > numbert of receivers)。
  • 在集群上运行Spark Streaming应用程序一样,我们至少要启动n个线程(n > numbert of receivers),否则不会有多余的线程来处理数据。

基本数据源

我们已经在前面的快速开始例子中展示了ssc.socketTextStream(...),它创建了一个从TCP端口接收文本数据的DStream。除此之外,Spark Streaming还为我们提供了一个创建从文件接收数据的DStream。

  • File Stream:从任何文件系统的文件中读取数据,并兼容HHDFS API。创建方式:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
/**
Spark Streaming会监控目录dataDirectory,然后处理任何在这个目录中创建的文件(嵌套目录不支持),注意:
1. 数据文件必须都有同样的格式
2. 数据文件必须通过moving或者renaming方式来创建在监控目录中。
3. 一旦数据文件移动到监控目录中就不能再改变了,持续追加的新数据并不能被识别。

对于普通的文本文件,有一个更简单的方法streamingContext.textFileStream(dataDirectory)。并且File Stream没有Receiver,也就不用占用处理核心了。
*/
  • 基于用户自定义Receivers的DStream:你可以根据自己的需求来定义Receiver,并依次来创建DStream,关于这一点我们之后会详细说到。
  • RDDs队列作为Stream:你可以通过一组RDDs来创建一个DStream,通过方法streamingContext.queueStream(queueOfRDDs)。这组RDDs中德每一个RDD都作为DStream的一个数据片,然后通过流处理程序进行相应的处理。

高级数据源

这一类的数据源需要依赖non-spark的包才能运行,其中一些更需要大量复杂的依赖包(比如Kafka和Flame)。因此为了最小化依赖冲出问题,创建这些数据源的方法都被移到了一些独立的包里,你在需要的时候可以引入到你的应用中。

这些高级数据源是不能直接在spark-shell中使用的,因此带有这些高级数据源的应用不能在spark-shell中进行试验。如果你真的需要再spark-shell中使用这些高级数据源,你需要下载这些依赖包然后把他们加入到类路径中。

高级数据源推荐使用版本:

  • Kafka: Spark Streaming 2.1.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.
  • Flume: Spark Streaming 2.1.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.
  • Kinesis: Spark Streaming 2.1.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.

数据接受器的可靠性

Spark Streaming中基于可靠新来说有两种数据源。一种数据源(Kafka和Flame)允许对传输的数据进行确认。系统从这些具有可靠性的数据源接受的数据都是确保正确的,它可以保证在任何错误的情况下数据都不丢失。

  • Reliable Receiver - 当接收数据的并存储在Spark中德时候,一个Reliable Receiver会发送确认信号给一个Reliable Source。
  • Unreliable Receiver - 一个Unreliable Receiver并不会在接收数据的时候发送确认信号给数据源。这用于不支持确认的数据源,或者是一个reliable sources,但是并不需要使用复杂的确认过程。

DStreams的转换操作

像RDD一样,转换操作允许Input DStream中的数据转换。DStream支持很多正常Spark RDD上的转换操作,我们来看看一看都有什么。

转换操作 说明
map(func) 对原DStream中的每一个数据元素执行func函数,结果组成一个新的DStream并返回。
flatMap(func) 类似于map,但是每一个输入项都可以映射到一个或者多个输出项。
filter(func) 对原DStream种的每一个数据元素执行func函数,所有返回值为true的元素组成一个新的DStream并返回。
repartition(numPartitions) 通过改变分区数来控制DStream的并行程度。
union(otherStream) 把两个DStream组合在一起新城一个新的DStream并返回。
count() 计算DStream中每一个RDD中的元素数量,并组成一个新的DStream并返回。
reduce(func) 对原DStream中的每元素使用func函数进行迭代计算(func函数接收两个参数返回一个值,参数值类型和返回值类型需要和DStream中元素类型一致),返回值作为下一次调用func函数的一个参数,另一个参数是DStream中的下一个元素。最终每个RDD的执行结果形成一系列的单元素RDD。
countByValue() 这个函数用于统计原DStream中每个元素出现的频次,比如作用在一个元素类型为K的DStream上,那么结果为一个元素类型为[K, Long]类型的新DStream。
reduceByKey(func, [numTasks]) 当作用在一个(k,v)对的DStream上时,形成一个新的(k,v)对DStream。新DStream种的元素是对原DStream种的元素按照k值进行聚合操作,其中func为聚合函数。
join(otherStream, [numTasks]) 对两个DStream按照key执行内连接,比如一个(k,v1)和一个(k,v2)两个DStream执行join,结果为(k,(v1,v2))的DStream。
transfor(func) 通过对原DStream中每一个RDD应用RDD-to-RDD函数,来创建一个新的DStream。这个可以在DStream中的任何RDD操作用使用。
updateStateByKey(func) 利用func函数来更新源DStream中每一个key的状态,来形成一个新的状态DStream。它可以用来管理DStream的任何状态数据。

UpdateStateByKey Operation

这个updateStateByKey操作允许你管理状态,并可以不断地使用新的状态信息来更新这个状态。要使用这个操作,你需要两步操作:

  1. 定义状态 - 这个状态可以使任意类型的
  2. 定义状态更新函数 - 声明一个函数来定义如何通过之前的状态和RDD数据集来更新新的状态。

Spark会把状态更新函数应用于每一个RDD中每一个Key对应数据集Seq,不论这个新的数据分片中是否有数据。如果状态更新函数返回None,那么这个key-value对就会被废弃了。

下面我们用一个例子来对这个状态更新函数进行说明。在这个例子中,我们会对文本输入流的数据进行词频统计。定义这个状态类型为Int,状态更新函数如下:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

这个函数作用于一个文本输入流上,会按word进行词频统计生成键值对(word,n)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

在执行过程中,newValues是当前RDD中的(word,1)键值对中的value值组成的Seq,runningCount是之前统计的key对应的value状态值。我们将其累加,得到新的状态值。

Transform Operation

Treansform操作可以对允许任何RDD-to-RDD的装换函数,作用在DStream上。通过这个操作,我们可以利用一些DStream不支持但是RDD支持的API,可以让我们的程序更加灵活。举个例子,把DStream中的每一个数据集和另外的一个数据集做Join操作,这个DStream的join部没有对这个进行支持,所以我们需要使用transform操作,先把DStream转化为RDD然后在进行join。

下面的例子中,我们将进行一个数据清洗操作。首先我们要把输入数据和一份已经处理好的数据(来自于HDFS中)做join,然后再根据相应的规则进行过滤。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

注意,这个操作会在每一个时间间隔内被执行一次,所以它允许你做实时的RDD操作。都可以在每一个批次间,改变比如改变分区数,广播变量等等。

Window Operation

Spark Streaming允许一个windowed computations,他可以让你在一个sliding window上应用一些transformactions操作。下面这张图,对这个进行了描述。 上图所示,我们定义了一个窗口,这个窗口会在源DStream上进行滑动,落在这个窗口内的源DStream数据会被合并为一个windowed DStream,而你可以对这个windowed DStream定义若干transformactions。图示的这个窗口大小为3个time units,并且每次滑动2个time units。所以我们在定义窗口的时候需要两个参数:

  • window length - 这个定义了窗口的大小
  • silding interval - 滑动时间间隔,也就是个窗口每次向后滑动的时间间隔

必须注意的是:这两个参数都必须是源DStream的batch interval的整数倍。

接下来,我们用一个例子来演示一下window operation。我们每隔10秒钟,统计一个过去30秒内从Input DStream中接受的单词词频。为了实现这个需求,我们需要在过去30秒的数据形成的(word,1)上执行reduceByKey操作,对应的window operation为reduceByKeyAndWindow。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

通用的window operations有如下这些,所有的operations函数都需要两个参数:window length和silding interval。

Transformation Meaning
window(windowLength,slideInterval) 通过定义窗口大小和滑动时间间隔,返回一个新的windowed DStream。
countByWindow(windowLength,slideInterval) 计算每一个时间窗口内的RDD中元素数量
reduceByWindow(func,windowLength,slideInterval) 对时间窗口内的DStream使用func函数进行迭代计算,func(x,y)函数的输入分别是上一次迭代的结果和DStream中的下一个数据元素。
reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks]) 对时间窗口内的DStream数据(K,V)键值对,并使用func函数对按key分组后的数据进行聚合操作并返回一个新的(K,V)DStream。
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval, [numTasks]) 这是对reduceByKeyAndWindow函数的改进,具有更好的性能。每当一个新的窗口到来的时候,与之前重合部分的DStream数据片并不重新计算,而是直接使用之前的计算结果,然后只对新的数据分片进行计算。然后把新的计算结果和之前已经计算的结果进行合并,得到新的窗口的聚合结果。
countByValueAndWindow(windowLength,slideInterval,[numTasks]) 对时间窗口内的每个元素统计出现的频次,形成一个新的(K,V)对DStream。

Join Operation

最后,我们来看看在Spark Streaming中做各种join operation是多么的简单。

Stream-stream joins

Stream可以非常简单地与其他Stream进行join。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

在这里,每个batch interval内stream1产生的RDD都会和stream2产生的RDD做join,当然你还可以做leftOuterJoin,rightOuterJoin,,fullOuterJoin操作。同样这些操作还可以应用于windowed DStream。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

这个已经在DStream.transform操作中提到过了。在这里我们重新演示一下如何把一个DStream和一个RDD数据集做join操作。

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

实际上,你可以动态改变需要关联的RDD,因为transform会在每一个window上执行,所以你可以在每次执行transform时重新加载RDD来达到动态关联的效果。

Output Operations on DStreams

Output operations允许把DStream中的数据推送到外部系统中,比如数据库或者文件系统。因为output operations是真正的让外部系统来消费DStream数据,所以他会触发DStream上定义的transformactions。我们来看一下DStream支持的output operations。

Output Operation Meaning
print() 在driver node上,打印DStream中每个批次数据的的前10条记录,这对与开发和调试非常有用。
saveAsTextFiles(prefix, [suffix]) 保存DStream中的数据为文本文件。每个批次数据生成的文件名由前后缀格式确定:"prefix-TIME_IN_MS[.suffix]"。
saveAsObjectFiles(prefix, [suffix]) 保存DStream中的数据为序列化后的Java对象。每个批次产生的数据文件命名规范为前后缀格式:"prefix-TIME_IN_MS[.suffix]"。
saveAsHadoopFiles(prefix, [suffix]) 保存DStream中的数据在Hadoop文件系统中。每个批次产生的数据文件命名规范为前后缀格式:"prefix-TIME_IN_MS[.suffix]"。
foreachRDD(func) 应用中最通常被使用的output operator,它可以使func函数作用在DStream中的每一个RDD上面。这个函数可以把RDD中的数据推送到外部系统,比如存储成文件或者写入到数据库中。注意,这个函数会运行在driver的线程上,然后把其内部定义的RDD actions作用在DStream RDD上。

使用foreachRDD的正确姿势

DStream.foreachRDD操作是非常强大的,他可以以最简单粗暴的方式把数据推送到外部系统上。为了能够正确的并且高效的使用这个output operation。我们需要避免以下常出现的错误使用方式。

通常情况下导出DStream数据到外部系统需要创建一个连接,使用这个连接来推送数据到外部系统。所以某些开发人员就会尝试在driver中创建一个连接,然后在worker中使用它。就像下面这样:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这种方式是不对的,你在driver上定义了connection,然后把他们序列化后给到worder去使用。因为这些connection对象几乎不可能跨机器使用的。它会引起一个serializable exception。正确的做法是在worker上面创建connection。

可是下面的方法会为每一个DStream中的元素创建一个connection,效率是十分低下的。

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常情况下,创建connection对象是需要时间和资源开销的。因此为每一个数据元素创建和销毁connection必然带来了不必要的开销,降低了整个系统的吞吐量。因此,最好的解决方案是使用RDD.foreachPartition来为每一个数据分片创建一个connection对象,然后使用这个对象发送分片数据到外部系统,完成之后销毁这个对象。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

我们可以进一步优化代码,让多个RDD数据分片复用这些connection。我们用一个静态的connection pool来管理这些connection,让RDD数据分片复用这些connection来推送数据到外部系统。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

当然这个connection pool是一个懒加载的,connection会根据需求来创建,并会在一定时间空闲后被销毁,这样才是一个高效的系统。

最后还有几点要注意的:

  • DStream的output operation是懒加载的,就像RDD上面定义的actions一样。具体来说,DStream的output operation中定义的RDD actions会作用在接收的数据上。因此,如果你的应用程序没有任何output operation,或者output operation中没有定义任何RDD actions比如DStream.foreachRDD( )。那么你的应用就不会真正被执行,只是在不断地接收数据。
  • 另外,output operation在一个时间点只有一个可以被执行。执行顺序是按照被定义的顺序。

DataFrame and SQL Operations

你可以非常容易的在Spark Streaming应用中使用DataFrame and SQL。这需要你在使用StreamingContext的同时使用SparkContext来创建一个SaprkSession。更好的一点是,它可以从driver崩溃重启中恢复。通过定义一个延迟加载的单例SparkSessionSingleton来实现。通过下面的例子我们来演示一下,在这里我们修改了之前的例子,使用DataFrame and SQL来实现词频统计。我们把每一个RDD都转化为DataFrame,然后注册成临时表并进行SQL统计查询。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// scalastyle:off println
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
/**
 * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
 * network every second.
 *
 * Usage: SqlNetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
 */
object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
    }
StreamingExamples.setStreamingLogLevels()
// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }
    ssc.start()
    ssc.awaitTermination()
  }
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient  private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}
// scalastyle:on println

你可以在另一个线程上执行基于streaming data注册的表的SQL查询工作(也就是与StreamingContext异步执行)。你需要确认你已经设置了StreamingContext能够保留满足SQL查询的数据。由于StreamingContext并不知道异步的SQL查询什么时候完成,它有可能会在SQL查询完成之前删除流数据。举个例子,如果你要在最近一个批次上做查询,并且这个查询需要执行5分钟的时间,你需要调用streamingContext.remember(Minutes(5))来保存5分钟的流数据。

Caching / Persistence

与RDDs一样,Spark同样允许开发人员把Streaming Data持久化到内存中。使用DStream的persist()方法可以自动地把DStream中的每一个RDD都存储到内存中。这对于需要多次参与计算的数据是非常有意义的。对于基于window的计算(比如reduceByWindow和reduceByWindowAndKey),和基于state的计算(比如updateStateByKey)都是隐式调用的。因此由window操作产生的DStream会自动存储在内存中的,并不需要开发者调用persist()方法。

对于从网络接受的流数据,Spark默认的方式是保存两份副本来保证容错性。

与RDD不同的是,DStream的默认持久化方式是serialized in memory。

Checkpointing

因为SparkStreaming程序都是7*24小时在运行的,所以SparkStreaming程序必须要能够从非应用程序逻辑错误中恢复(比如系统原因,JVM崩溃等)。为了实现这一点,Spark Streaming需要checkpoint足够的信息来保证其可以在在问题出现后恢复。有两种类型的数据需要设置检查点。

  • Metadata checkpointing - 保存streaming computation信息在高可用型的存储系统上,比如HDFS。这可以用在从灾难中恢复Streaming应用的Driver。
    • Configuration - The configuration that was used to create the streaming application.
    • DStream operations - The set of DStream operations that define the streaming application.
    • Incomplete batches - Batches whose jobs are queued but have not completed yet.
  • Data checkpointing - 保存生成的RDD在可靠地存储系统中。这对于一些跨多个数据分片的带有状态的transformactions是非常有必要的。在这些transformactions中,生成的RDD依赖于之前数据分片的RDD,这会导致依赖关系链的长度随着时间不断增加。为了为恢复时候避免这种无边界的增长,我们必须把这些有状态的transformactions产生的RDD中间结果存储在可靠系统中,来切断这种依赖链。

何时启用checkpoint

有如下需求的时候,你需要设置checkpoint。

  • Usage of stateful transformations - 如果你在你的应用程序中使用了updateStateByKey和reduceByKeyAndWindow(带有可以函数),那么你就需要设置checkpoint路径保证定期的为RDD设置检查点。
  • Recovering from failures of the driver running the application-Metadata checkpoints用来恢复过程信息。

如何启用checkpoint

可以通过在一个容错的可靠型存储系统中设置checkpoint dictionary来启用checkpointing,这可以通过streamingContext.checkpoint(checkpointDirectory)来完成。这会允许你使用stateful transformations。另外的,如果你想要你的应用能够在driver崩溃后恢复,那么你需要重写你的Streaming Application使其具有如下两个行为。

  • 如果你是第一次启动系统,它会创建一个新的StreamingContext,配置好所有的streaming operation然后使用start开启流处理。
  • 当应用程序从灾难中恢复,系统会使用checkpoint来恢复StreamingContext。

要想启用checkpoint机制,我们需要通过StreamingContext.getOrCreate来实现。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在的话,系统会使用checkpoint来恢复StreamingContext。如果目录不存在(比如第一次运行的时候),系统会使用functionToCreateContext方法来创建StreamingContext,并且设置DStream。可以参考如下的完整例子。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.util.{IntParam, LongAccumulator}

/**
 * Use this singleton to get or register a Broadcast variable.
 */
object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

/**
 * Use this singleton to get or register an Accumulator.
 */
object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

/**
 * Counts words in text encoded with UTF8 received from the network every second. This example also
 * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
 * they can be registered on driver failures.
 *
 * Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
 *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
 *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
 *   <output-file> file to which the word counts will be appended
 *
 * <checkpoint-directory> and <output-file> must be absolute paths
 *
 * To run this on your local machine, you need to first run a Netcat server
 *
 *      `$ nc -lk 9999`
 *
 * and run the example as
 *
 *      `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
 *              localhost 9999 ~/checkpoint/ ~/out`
 *
 * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
 * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
 * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
 * the checkpoint data.
 *
 * Refer to the online documentation for more details.
 */
object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
    : StreamingContext = {

    // If you do not see this printed, that means the StreamingContext has been loaded
    // from the new checkpoint
    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkpointDirectory)

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = "Counts at time " + time + " " + counts
      println(output)
      println("Dropped " + droppedWordsCounter.value + " word(s) totally")
      println("Appending to " + outputFile.getAbsolutePath)
      Files.append(output + "\n", outputFile, Charset.defaultCharset())
    }
    ssc
  }

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Your arguments were " + args.mkString("[", ", ", "]"))
      System.err.println(
        """
          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
          |     <output-file>. <hostname> and <port> describe the TCP server that Spark
          |     Streaming would connect to receive data. <checkpoint-directory> directory to
          |     HDFS-compatible file system which checkpoint data <output-file> file to which the
          |     word counts will be appended
          |
          |In local mode, <master> should be 'local[n]' with n > 1
          |Both <checkpoint-directory> and <output-file> must be absolute paths
        """.stripMargin
      )
      System.exit(1)
    }
    val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

另外,使用getOrCreate需要部署底层的应用来保证driver程序可以在灾难后自动重启。

请注意,对RDD启用checkpoint会导致额外的存储开销。这可能会影响到这些参与checkpoint的RDD所在批次的处理时间。因此,checkpoint的时间间隔需要小心设置。尤其是对于小批次(1s间隔),对RDD设置checkpoint会导致明显的降低应用的吞吐量。相反,checkpoint设置的间隔太大又会导致依赖链和任务量的增长,这也是有问题的。对于stateful transformations需要设置checkpoint,那么多个批次之间的间隔我们通常情况下设置最少为10秒。可以通过dstream.checkpoint(checkpointInterval)来设置。一般情况下,checkpoint间隔为5-10个DStream时间片为最佳。

累加器和广播变量

在Spark Streaming程序中累加器和广播变量并不能从checkpoint中恢复。如果你想要使checkpoint同样能对累加器和广播变量适用,你就得创建一个延时加载的单例实例,以便它们能够在driver崩溃重启后恢复。看一下下面这个例子。

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

应用部署

为了运行一个Spark Streaming应用,你需要完成以下几点。

  • Cluster with a cluster manager - 这是任何Spark应用运行最基本的要求。
  • Package the Application JAR - 如果要使用spark-submit来运行的话,你需要把你的Spark Streaming应用程序打成一个JAR包。你不需要提供Spark和Spark Streaming在这个JAR中。不过,当你的应用用到了高级数据源(如Kafka,Flame等),你必须把它们引用的额外JAR包引入的你的应用运行环境里面来。举个例子你的应用使用了KafkaUtils,就不得不把spark-streaming-kafka-0-8_2.11,以及它所依赖的所有JAR,加入到你的应用程序中。
  • Configuring sufficient memory for the executors - 因为接收到的数据会存在内存中,所以你需要给executors配置足够量的内存来容纳接收到的数据。请注意如果你定义了10分钟的window操作,那么你就要保证至少内存中能够容纳下10分钟的数据,所以内存的需求取决于你的应用的需要。
  • Configuring checkpointing - 如果你需要使用checkpoint,那么你就需要配置checkpoint dictionary在一个可靠的存储中(HDFS,S3等)。这样系统就会在这个目录中写入checkpoint信息用于恢复。
  • Configuring automatic restart of the application driver - 为了能够让driver程序能够在失败后重启,你需要部署一个底层应用来对driver进程进行监控,一旦发现driver崩溃,能够重新启动driver。对于不同的集群有不同的解决方式。
    • Spark Standalone - Spark应用可以提交到Spark Standalone集群上来运行,driver程序则可以运行在集群的任何一个节点上。而且,Standalone cluster manager可以对driver进行监控,一旦driver程序发生任何携带非零退出码的错误,或者因为运行driver的节点出现故障的情况下重新运行dirver程序。
    • YARN - Yarn supports a similar mechanism for automatically restarting an application. Please refer to YARN documentation for more details.
    • Mesos - Marathon has been used to achieve this with Mesos.
  • Configuring write ahead logs - 从Spark1.2开始,我们提供了write ahead logs机制来保证强大的容错能力。一旦启用,系统会把所有接受的数据存储在你配置的checkpoint dictionary中,当然这个目录必须是在可靠存储系统中的。它会防止在driver重启过程中数据丢失,以此来保证0数据丢失,使用spark.streaming.receiver.writeAheadLog.enable=true来启用这个功能。然而这种方式会带来额外的损耗,可能会影响到数据的吞吐量。当然,你可以启动更多地receiver来对吞吐量进行修正。另外,Spark建议你在启用了write ahead logs机制的同时,关闭对接受数据保留备份的机制。你可以通过改变input stream的storage level为StorageLevel.MEMORY_AND_DISK_SER来实现。当使用S2(或者其他不支持flushing的文件系统)的时候,记得启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.
  • Setting the max receiving rate - 如果集群资源不足以支撑你的streaming application能够足够快的处理数据,可以限制receiver每秒接受的数据量。可以通过设置receiver的spark.streaming.receiver.maxRate属性或者对于采用Kafka方案时的spark.streaming.kafka.maxRatePerPartition属性。从Spark1.5开始,我们可以通过启用backpressure,来让Spark可以根据负载来动态的调整接受速率,要启用这个方案,需要设置spark.streaming.backpressure.enabled=true。

容错机制

在这一节,我们要讨论一下Spark Streaming的容错机制。

背景知识

为了能够更好地理解Spark Streaming的容错机制,我们先来看下Spark RDD的基本容错机制。

  1. 任何一个RDD都是一个不可变的,可重计算的,分布式的数据集。每一个RDD都记录了确定的血缘关系(一些列转换操作的依赖关系)来进行容错。
  2. 如果任何RDD分区因为worder节点错误导致丢失,那么这个分区就可以通过原始输入数据集来恢复,恢复的过程就是靠RDD记录的血缘关系(一些列转换操作的依赖关系)。
  3. 假设RDD的transformaction是不变的,那么无论集群发生任何错误,最终生成的RDD中的数据都是一样的。

Spark的处理数据来自于容错的文件系统(HDFS,S3等),因此从容错数据生成的RDD都是容错的。然而Spark Streaming从网络接受数据的情况并不是这样。为了同样能够让生成的RDD具有容错特性,接受的数据会生成一个副本并分布在不同的集群节点上。这就会有两种类型的数据需要从错误中恢复。

  1. Data received and replicated - 这种数据会在失败后不会丢失,应为它会有一个副本保存在另外一个节点上,就像分身术一样,并且两个都是本体。
  2. Failure of the Driver Node - 如果正在运行Spark Streaming的driver节点失败了,那么很显然SparkContext也就没哟了,所有执行器以及暂存在执行器内存中的数据也就丢失了。

有了这些基础知识之外,我们接下来可以详细了解一下Spark Streaming的容错机制了。

定义

流处理系统通常会关心每一条数据会被处理几次。一个系统在所有的操作情况下(不管是错误,还是其他),可以提供三种类型的保障机制。

  1. At most once - 每一条数据都会被执行一次或者不执行。
  2. At least once - 每一条记录都会被执行一次或者多次。这种比第一种有更强的容错能力,能够保证数据不丢失。但是可能需要多次重复计算。
  3. Exactly once - 每一条记录都会恰好执行一次,并且没有数据丢失没有数据被执行多次。这明显是这三种中最好的一个保障机制。

基本概念

在任何流系统中,处理数据都会分为三步。

  1. Receiving the data - 使用接收器或者其他方式从源接收数据。
  2. Transforming the data - 接收到的数据进行transform操作,通过DStream或者RDD上定义的各种transformaction。
  3. Pushing out the data - 最终,经过处理的数据,要输出到外部系统。比如文件系统或者数据库。

如果一个流处理应用可以提供一个end-to-end的exactly-once guarantees,那么每一步都要提供一个exactly-once guarantees。也就是说,每一条记录都只能接受一次,转换一个,输出一次。下面,让我们来看一下Spark Streaming对这三步提供的一个机制。

  1. Receiving the data - 不同的源能够提供不同的担保机制,下面我们会说
  2. Transforming the data - 所有接收到的数据都会被处理一次,这要感谢RDD提供的guarantees。即使处理过程中发生了错误,只要输入数据还能被访问,那么最终结果永远都是一样。
  3. Pushing out the data - 输出操作只保证了at-least once,因为这取决于你使用了什么类型的操作和输出到的外部系统是否提供了什么机制(比如食事务控制)。但是用户可以实现自己的事务控制方法,来保证exactly-once。这会在这一节的后面讨论到。

数据接收机制

不同的输入数据源能够提供从at-least once到exactly once的不同保证机制。

文件源

如果待输入数据已经持久化到高可用文件系统中。那么Spark Streaming就可以从任何错误值恢复,并重新处理所有数据。这种方式提供了exactly once这种最高级别的保证机制,保证了任务错误情况下数据都能够被处理并且只处理一次。

基于接收器的源

对于基于接收器的数据源来说,容错机制同时取决于错误发生的情况和使用了何种接收器。为了方便讨论,我们把接收器分为两类。

  1. 可靠型接收器 - 这一类接收器会在确保接收到的数据已经备份之后和数据源通信告知数据已经接收。如果接收器出现错误,数据源就不会收到来自于接收器对于已经缓存(但并未备份)数据的确认消息。因此,在接收器重启后,数据源会重新发送数据到接收器来保证数据不会丢失。
  2. 非可靠型接收器 - 这种接收器没有与数据源的确认机制,所以有可能因为driver活着worker的问题出现错误后发生数据丢失的情况。

下面看看使用不同种类的接收器在具体错误发生的情况下产生的结果。如果worker节点发生故障,使用可靠型接收器不会发生数据丢失,而使用非可靠型接收器会导致还未备份的数据丢失。如果driver节点发生故障,那么所有接收到和已经备份到内存中的数据都会丢失,这将影响带有状态信息转换操作的结果。

为了避免丢失接收到的数据,Spark 1.2开始引入了write ahead logs机制来吧接收到的数据备份到可靠地存储系统中。启用write ahead logs机制并使用可靠型接收器可以保证零数据丢失。使用这种方案,提供了at-least once guarantee机制。

Deployment Scenario Worker Failure Driver Failure
Spark 1.1 or earlier, OR Spark 1.2 or later without write ahead logs Buffered data lost with unreliable receivers Zero data loss with reliable receivers At-least once semantics Buffered data lost with unreliable receivers Past data lost with all receivers Undefined semantics
Spark 1.2 or later with write ahead logs Zero data loss with reliable receivers At-least once semantics Zero data loss with reliable receivers and files At-least once semantics

输出机制

输出操作(例如foreachRDD)都提供了at-least once机制,这意味着数据可能会不止一次地输出到外部系统。当你使用saveAs***Files方法的时候就可能会发生重复写入相同数据。你需要一些额外的工作来保证exactly-once。在这里提供两种解决方案。

  • 等价更新 - 即为多次写入同样的数据到相同位置,这样并不会影响最终结果。
  • 事务控制 - 每一次的更新数据操作都会启动事务,用是无奈保证exactly once。
    • 使用batch time和partition index来创建一个id,使用这个id来确保数据的唯一性
    • 启动事务并使用这个id来更新外部系统数据,如果这个id不存在则提交更新,如果这个id已经存在那么则放弃更新。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

性能优化

要想使你的Spark流处理应用能够获得更好地性能,你需要大量的优化工作。在这一节中,我们提供了许多配置和参数来对你的程序进行改进。首先你需要从两个方面出发来考虑优化工作。

  1. 通过有效的的使用集群资源来降低每个批次的数据处理时间。
  2. 设置一个合适的批次大小以便程序能够尽量快的处理这些数据。

降低每个批次的处理时间

在Spark中有许多方式可以降低每个批次的数据处理时间,你可以参考Tuning Guide,这部分提到了许多优化要点。

优化数据接收并行度

通过网络方式接收数据(Flame,Kafka等),要求把数据反序列化后存储在Spark中。如果数据接收是一个瓶颈,那么我们就考虑采用多个receiver并行的方式。注意,每一个input DStream都可以创建一个单独的receiver(创建在worker节点上)来接收独立地接收流数据。因此,我们可以创建多个input DStream来同时接收多个数据源上的流数据。比如使用一个Kafka input DStream接收两个topic的数据,你完全可以创建两个input DStream来分别接受两个topic的数据,每一个receiver只负责从一个topic接收数据。这样就是一个并行的方式,来增加数据吞吐量。并且这些DStream可以组合成意义DStream,你定义在这个DStream上面的transformactions可以作用在每一个单独的input DStream上面。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另一个需要考虑的参数是receiver的blocking interval,通过spark.streaming.blockInterval来定义。对于大多数的receiver来说,会把接受到的数据合并成一个数据块然后存储到Spark的内存中。每一个批次中数据块的数量决定了要使用多少个任务来处理这些数据。每一个receiver的每一个batch上的task数量约等于(batch interval / block interval)。比如,block interval为200ms,那么每2s的批次数据需要使用20个task来处理。如果task的数量太低(比如小于机器的内核数量),那么就不能够使用全部的内核来参与计算,导致效率降低。为了增加每一个batch interval的task数量,你可以缩小block interval。但是,推荐使用最小的block interval是50ms,低于这个值的话接下来启动任务的开销将是一个问题。

对于从多个input streams/receiver接受数据的选择是重分配数据并行度(usinginputStream.repartition(<number of partitions>))。这可以在对数据进行处理之前,指定每一个批次接收到的数据使用task的数量。

优化数据处理并行度

如果任何阶段计算任务的并行度都不够高,那么会直接导致集群资源利用率低下(比如我们集群的计算资源为20个核心,但是每个阶段的并行度都只有不到10个,那么就会导致分配的集群资源利用率低下j)。对于一些分布式reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行度通过spark.default.parallelism来控制。你可以通过改变这个参数值来改变计算的并行度。

数据序列化

数据序列化的开销可以通过设置数据的序列化方式来优化。在Spark Streaming中有两种类型的数据需要序列化。

  • Input data - 默认情况下接收器接收到的数据会使用StorageLevel.MEMORY_AND_DISK_SER_2来存储在executors的内存中。数据被序列化为bytes以便节约GC的开销,和生成副本来容忍执行器错误。当然数据首先会保存在内存中,知道内存不足以装下需要参与流计算的数据。显示,序列化是需要开销的,接收器必须反序列化接收到的数据然后再次通过Spaark定义的序列化格式对数据进行序列化。
  • Persisted RDDs generated by Streaming Operations - 通过计算得到的RDD会被持久化到内存中。譬如,窗口操作会把数据持久化到内存中,因为这些数据需要计算多次。然后,与Spark Core默认的存储级别StorageLevel.MEMORY_ONLY不同,Streaming使用的默认存储级别为StorageLevel.MEMORY_ONLY_SER,以便减小GC的开销。

在所以情况下,使用Kryo序列化方式可以有效地减小CPU和内存的开销。

在特定的情况下,可能流处理需要保存的数据总量不会太大。直接保存反序列化后的对象并不对产生过大的GC开销。如果你使用秒级的batch interval并且没有任何窗口操作的话,那么你可以通过设置storage level可以禁用以序列化方式持久化数据。这可以降低CPU对于序列化的开销,同时又不会带来太大的GC开销。

任务启动开销

如果每秒运行的任务数量非常高(比如50+每秒),把任务发送到slaves时的开销会非常大,这样很难实现次秒级别的延时。你可以通过以下方式来优化。

  • Execution Mode - 以Standalone模式或者coarse-grained Mesos模式可以获得更好地任务运行次数,比起fine-grained Mesos模式。

这种方式可以降低批处理时间100s of milliseconds,从而实现次秒级别的批处理。

合理的批次间隔

为了能够保证Spark Streaming程序能够稳定的运行在集群上,系统应该尽可能快的处理接收到数据。话句话来说,每个批次的数据都应该在它生成后尽可能快的被处理。这可以在Streaming Web UI中进行监控,每个批次的处理时间必须要小于每个批次的间隔时间。

基于流处理的特性来说,运行在固定集群资源上的应用,对于批次间隔的选取会严重影响数据处理效率。举个例子,让我们重新考虑一下前面的例子WordCountNetwork。对于特定的数据速率,系统可能只能够保证每两秒生成一次词频报告,而不是每500毫秒。所以批次间隔就必须设置成这样,以便可以持续的运行。

一个好让你找到适合你程序的批次大小的方式是,使用一个保守的批次间隔时间(5-10s)和一个较低的数据速率来测试你的程序。为了测试系统是否可以保证这个数据速率,你可以检查每一个批次处理的端到端的时间延迟(既可以在Spark driver的log4j日志中找到“Total delay”,也可以使用StreamingListener接口),如果延迟可以维持在一个与批次间隔差不多的水平,那么系统就是稳定的。否则,如果延迟在不断地增加,那么也就意味着系统并不能稳定运行了。一旦你有了一个稳定的配置,那么你可以尝试加快数据速率和降低批次间隔。注意,一个瞬间的延迟增大,可能只是短暂的数据率的增长,随着数据率的下降,延迟会回到一个合理的水平。

内存优化

内存调优和GC策略会在Tuning Guide一节详细讨论,在这里我们只着重说明对于StreamingContext相关的调优参数。

Spark Streaming应用对于内存的需求量很大程度上取决于你使用了什么样的transformaction。如果你对过去10分钟的数据使用了一个窗口操作,那么你的集群就需要足够多的内存来保存着10分钟的数据。或者,比如你使用了updateStateByKey方法来出来大量的key的数据集,那么你使用的内存量肯定就会大。相反如果你只是用了一个简单的map-filter-store操作,那么需要的内存量就会小。

在通常情况下,因为从接收器接收到的数据使用的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,数据在内存中装不下就是放到硬盘中。这就会降低了流处理应用的性能,因此建议提供足够的内存来提高性能。

内存优化的另一个方面是垃圾回收机制。因为流处理程序需要低延迟,所以不希望JVM的垃圾回收影响程序的执行。

下面展示 一些基于内存和GC策略调优的参数。

  • Persistence Level of DStreams - 之前数据序列化这一章中已经提到,接收器接收到的数据会被默认序列化,这可以降低内存使用和GC开销。使用Kyro方式进行序列化可以进一步地降低序列化后数据大大小和内存消耗量。进一步的降低内存使用量可以使用压缩(spark.rdd.compress),但同时会增加CPU的开销。
  • Clearing old data - 默认情况下,所有的输入数据和通过DStream transformaction产生的持久化的RDD都会被自动清理。Spark Streaming会根据使用的transformaction来决定何时清理数据。举个例子,如果你使用一个10分钟的窗口,那么程序会保留10分钟的数据,然后自动的清理老数据。当然你可以通过设置streamingContext.remember参数来让数据保留更长的时间。

要点

  • 一个DStream会关联一个单独的接收器,为了能够并行运行多个接收器,就要创建多个DStream。一个接收器需要一个executor,占用一个内核。为了确保在接收器占用了内核后,还有足够的内核来进行处理工作,你必须要再内核分配的时候同时考虑到这两部分。接收器以轮询的方式来分配执行器。
  • 从流数据源接收到数据后,接收器会创建一个数据块。每个block interval都会生成一个数据块。N个数据块会被生成在一个batch interval内(N=batchInterval/blockInterval)。这些块被块管理器分发到其他执行器的块管理器上。在这之后,运行在driver上的网络接受追踪器会被通知这些块所在的位置,以便进行下一个的数据处理。
  • 在batch interval期间,driver会对这些块创建一个RDD。在这个batch interval内生成的快都是这个RDD的分区。每一个分区在Spark上都是一个任务。如果blockInterval== batchinterval,则意味着只有一个分区,并且可能就直接在本地处理了。
  • 这些块上的map任务都运行在执行器单元上(一个在接收数据块的位置,另一个在数据块被备份到的位置)。这可以让block并不必关心block interval,除非是非本地调度。较大的block interval会带了更大的block,参数spark.locality.wait,能够让块更可能在本地处理。你需要在这两个参数间找到一种平衡,来能够保证bigger block能够在本地执行。 -除了可以使用batch interval和block interval之外,你还可以通过inputDstream.repartition(n)来定义分区数。这会对RDD中的数据进行随机重组,生成n个数据分区。为了更合理的分区数,你必须付出一个数据重组的代价。RDD的处理,都是作为一个job通过driver的jobscheduler来进行调度的。在一个给定的时间点只有一个job是活动的。所以,如果一个job是执行中的,那么其他job就是排队中。
  • 如果你有两个DStream,那么就会形成两个RDD,也就会生成两个Job,然后被一个接一个的调度。为了避免这种情况,你可以对峙两个DStream执行union操作。这保证了两个DStream RDD会产生一个unionRDD,这个unionRDD会当做一个单独的job。然而这对RDDs中的分区并没有任何影响。
  • 如果批次处理时间远大于批次间隔,那么接收器的内存会被塞满,并且最终会抛出异常(最可能是BlockNotFoundException)。目前,我们还没有方式来停止这个接收器,只能通过Spark配置spark.streaming.receiver.maxRate来限制接收器的数据接受率。
展开阅读全文
打赏
1
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
1
分享
返回顶部
顶部