SparkStreaming Java
博客专区 > Yulong_ 的博客 > 博客详情
SparkStreaming Java
Yulong_ 发表于4个月前
SparkStreaming Java
  • 发表于 4个月前
  • 阅读 13
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。我们可以从kafka、flume、Twitter、 ZeroMQ、Kinesis等源获取数据,也可以通过由 高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据可以推送到文件系统、数据库、实时仪表盘中。

基本概念

  • 离散流(Discretized Streams \ DStreams):Spark Streaming对内部持续的实时数据流点的抽象描述,及我们处理的一个实时数据流,在SparkStreaming中对应于一个DStream实例。
  • 批数据(batch data):将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着时间片的推移,处理结果就形成了对应的结果数据流。
  • 时间片或者批处理间隔(batch interval):人为地将流数据进行定量的标准,以时间片作为拆分数据流的依据。一个时间片的数据对应一个RDD实例。
  • 窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。
  • 滑动时间间隔:前一个窗口到后一个窗口所经历的时间长度。必须是批处理的时间间隔的倍数。
  • Input DStream :一个input DStream是一个特殊的DStream,将Spark Stream连接到一个外部数据源来读取数据。
  • Receiver:长时间运行在Excetor。每个Receiver负责一个Input DStream(例如一个读取Kafka消息的输入流)。每个Receiver,加上input DSteam会占用一个core/slot。

1、项目创建

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。

关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。

关于Spark版本:使用1.6.3进行编写。

maven 依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

例子:

public class NetworkWordCount {
	public static void main(String[] args) {
		networkWC();
	}
	public static void networkWC() {
		// Create a local StreamingContext with two working thread and batch
		// interval of 5 second
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
		
		// Create a DStream that will connect to hostname:port, like
		// localhost:9999
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
		// Split each line into words
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String x) {
				return Arrays.asList(x.split(" "));
			}
		});
		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});
		// Print the first ten elements of each RDD generated in this DStream to
		// the console
		wordCounts.print();
		jssc.start(); // Start the computation
		jssc.awaitTermination(); // Wait for the computation to terminate
	}
}

2、SparkStreaming

2.1、初始化

为了初始化Spark Streaming程序,一个JavaStreamingContext对象必需被创建,它是SparkStreaming所有流操作的主要入口。一个JavaStreamingContext 对象可以用SparkConf对象创建。需要注意的是,它在内部创建了一个JavaSparkContext对象,你可以通过 jssc.sparkContext 访问这个SparkContext对象。

// Create a local StreamingContext with two working thread and batch
// interval of 5 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

当一个上下文(context)定义之后,你必须按照以下几步进行操作

  • 定义输入源;
  • 准备好流计算指令;
  • 利用 streamingContext.start() 方法接收和处理数据;
  • 处理过程将一直持续,直到 streamingContext.stop() 方法被调用。

几点需要注意的地方:

  • 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。
  • 一旦一个context已经停止,它就不能再重新启动
  • 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
  • 在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置 stop() 的可选参数为false
  • 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。

2.2、Discretized Streams (DStreams)

DStreams是Spark Streaming提供的基本的抽象,它代表一个连续的数据流。它要么是从源中获取的输入流,要么是输入流通过转换算子生成的处理后的数据流。在内部,DStreams由一系列连续的 RDD组成。DStreams中的每个RDD都包含确定时间间隔内的数
据,如下图所示:

任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作。在前面的例子中, flatMap 操作应用于 lines 这个DStreams的每个RDD,生成 words 这个DStreams的RDD。过程如下图所示:

通过Spark引擎计算这些隐含RDD的转换算子。DStreams操作隐藏了大部分的细节,并且为了更便捷,为开发者提供了更高层的API。下面几节将具体讨论这些操作的细节。

2.3、Input DStreams and Receivers

输入DStreams表示从数据源获取输入数据流的DStreams。在快速例子中, lines 表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream 和一个 Receiver 对象相关联,这个 Receiver 从源中获取数据,并将数据存入内存中用于处理。

输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源

  • 基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等。
  • 高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。它们需要通过额外的类来使用。我们在关联那一节讨论了类依赖。

需要注意的是,如果你想在一个流应用中并行地创建多个输入DStream来接收多个数据流,你能够创建多个输入流(这将在性能调优那一节介绍) 。它将创建多个Receiver同时接收多个数据流。但是, receiver 作为一个长期运行的任务运行在Spark worker或executor中。因此,它占有一个核,这个核是分配给Spark Streaming应用程序的所有 核中的一个(itoccupies one of the cores allocated to the Spark Streaming application)。所以,为SparkStreaming应用程序分配足够的核(如果是本地运行,那么是线程) 用以处理接收的数据并且运行 receiver 是非常重要的。

几点需要注意的地方:

  • 如果分配给应用程序的核的数量少于或者等于输入DStreams或者receivers的数量,系统只能够接收数据而不能处理它们。
  • 当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为 receiver 的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。

2.3.1、Basic sources

我们已经在快速例子中看到, ssc.socketTextStream(...) 方法用来把从TCP套接字获取的文本数据创建成DStream。除了套接字,StreamingContext API也支持把文件 以及Akka actors作为输入源创建DStream。

套接字:从一个ip:port监控套接字。

文件流(File Streams):从任何与HDFS API兼容的文件系统中读取数据,一个DStream可以通过如下方式创建

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

Spark Streaming将会监控 dataDirectory 目录,并且处理目录下生成的任何文件(嵌套目录不被支持)。需要注意一下三点:

  1. 所有文件必须具有相同的数据格式
  2. 所有文件必须在`dataDirectory`目录下创建,文件是自动的移动和重命名到数据目录下
  3. 一旦移动,文件必须被修改。所以如果文件被持续的附加数据,新的数据不会被读取。

对于简单的文本文件,有一个更简单的方法 streamingContext.textFileStream(dataDirectory) 可以被调用。文件流不需要运行一个receiver,所以不需要分配核。

2.3.2、Advanced sources

(待补充)

2.3.3、Custom Sources

(待补充)

2.4、Transformations on DStreams

和RDD类似,transformation允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:

Transformation Meaning
map(func) Return a new DStream by passing each element of the source DStream through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items.
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true.
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions.
union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
cogroup(otherStream, [numTasks]) When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

 

2.4.1、UpdateStateByKey Operation

updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它

  • 定义状态-状态可以是任何的数据类型
  • 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态

让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个state表示,它的类型是整数

import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
      Integer newSum = ...  // add the new values with the previous running count to get the new count
      return Optional.of(newSum);
    }
  };

这被应用在一个包含word( pairs DStream 含有(word,1)对)的DStream上。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

这个update函数被每个单词word调用,newValues 拥有一系列的1(从 (词, 1)对而来),runningCount拥有之前的次数。要看完整的代码,见例子JavaStatefulNetworkWordCount.java. 备注使用updateStateByKey时需要定义checkpoint目录,更多细节在checkpointing部分。

2.4.2、Transform Operation

(略)

2.4.3、Window Operations

Spark Streaming也支持窗口计算,它允许你在一个滑动窗口数据上应用transformation算子。下图阐明了这个滑动窗口。

如上图显示,窗口在源DStream上滑动,合并和操作落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数:

  • 窗口长度:窗口的持续时间
  • 滑动的时间间隔:窗口操作执行的时间间隔

这两个参数必须是源DStream的批时间间隔的倍数。
下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,我们必须在过去30秒的 pairs DStream上应用 reduceByKey 操作。用方法 reduceByKeyAndWindow 实现。

// Reduce function adding two integers, defined separately for clarity
Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
  @Override public Integer call(Integer i1, Integer i2) {
    return i1 + i2;
  }
};

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));

一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数:窗口长度和滑动的时间间隔

Transformation Meaning
window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength, slideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(func, windowLength, slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.

countByValueAndWindow(windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

2.4.4、Join Operations

最后,值得强调的是,您可以轻松地在Spark Streaming中执行不同类型的join。

Stream-stream joins

Streams 能够方便地和其他streams进行join操作.

JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在这里,每批间隔,stream1产生的 RDD将与stream2产生的 RDD进行join操作。你也可以做leftouterjoin,rightouterjoin,fullouterjoin。此外,在stream的窗口上进行join通常是非常有用的。这也很容易。

JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);

Stream-dataset joins

下面是另一个将窗口流与数据集join的例子。

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(
    new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
        @Override 
        public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
            return rdd.join(dataset);
        }
    }
);

事实上,您还可以动态地更改你想要join的数据集。提供转换的函数在每个批处理间隔中进行计算求值,因此将使用数据集引用指向的当前数据集。

The complete list of DStream transformations is available in the API documentation. For the Scala API, see DStream and PairDStreamFunctions. For the Java API, see JavaDStream and JavaPairDStream. For the Python API, see DStream.

2.5、Output Operations on DStreams

输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作:

Output Operation Meaning
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.
Python API This is called pprint() in the Python API.
saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

Design Patterns for using foreachRDD

利用foreachRDD的设计模式

dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中 尝试调用这个连接对象保存记录到RDD中,如下:

dstream.foreachRDD(rdd	=>	{
						val	connection	=	createNewConnection()		//	executed	at	the	driver
						rdd.foreach(record	=>	{
										connection.send(record)	//	executed	at	the	worker
						})
		})

这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。

然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如:

dstream.foreachRDD(rdd	=>	{
						rdd.foreach(record	=>	{
										val	connection	=	createNewConnection()
										connection.send(record)
										connection.close()
						})
		})

通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用 rdd.foreachPartition 方法。 为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。

 

dstream.foreachRDD(rdd	=>	{
						rdd.foreachPartition(partitionOfRecords	=>	{
										val	connection	=	createNewConnection()
										partitionOfRecords.foreach(record	=>	connection.send(record))
										connection.close()
						})
		})

这就将连接对象的创建开销分摊到了partition的所有记录上了。

最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。

dstream.foreachRDD(rdd	=>	{
						rdd.foreachPartition(partitionOfRecords	=>	{
										//	ConnectionPool	is	a	static,	lazily	initialized	pool	of	connections
										val	connection	=	ConnectionPool.getConnection()
										partitionOfRecords.foreach(record	=>	connection.send(record))
										ConnectionPool.returnConnection(connection)		//	return	to	the	pool	for	future	reuse
						})
		})

需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。

其它需要注意的地方:

  • 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作 dstream.foreachRDD() ,但是没有任何RDD action操作在 dstream.foreachRDD() 里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。
  • 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。

2.6、DataFrame and SQL Operations

你可以很容易地使用DataFrames and SQL操作数据流。你必须使用StreamingContext已用的SparkContext,以创建一个SQLContext。此外,这必须这样做,它可以在驱动程序故障的时候重新启动。这是通过sqlcontext惰性实例化一个单例实例来完成的。下面的例子说明了这一点。它改变了早期的word count example 。每个RDD转换为一个DataFrame,作为一个临时表注册并通过SQL进行查询。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD(
  new Function2<JavaRDD<String>, Time, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd, Time time) {

      // Get the singleton instance of SQLContext
      SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

      // Convert RDD[String] to RDD[case class] to DataFrame
      JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
        public JavaRow call(String word) {
          JavaRow record = new JavaRow();
          record.setWord(word);
          return record;
        }
      });
      DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);

      // Register as table
      wordsDataFrame.registerTempTable("words");

      // Do word count on table using SQL and print it
      DataFrame wordCountsDataFrame =
          sqlContext.sql("select word, count(*) as total from words group by word");
      wordCountsDataFrame.show();
      return null;
    }
  }
);

查看完整代码 source code.

You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext.remember(Minutes(5)) (in Scala, or equivalent in other languages).

2.7、Input DStreams and Receivers

和RDD相似,DStreams也允许开发者持久化流数据到内存中。在DStream上使用 persist() 方法可以自动地持久化DStream中的RDD到内存中。如果DStream中的数据需要计算多次,这是非常有用的。像 reduceByWindow 和 reduceByKeyAndWindow 这种窗口操作、 updateStateByKey 这种基于状态的操作,持久化是默认的,不需要开发者调用 persist() 方法。例如通过网络(如kafka,flume等)获取的输入数据流,默认的持久化策略是复制数据到两个不同的节点以容错。
注意,与RDD不同的是,DStreams默认持久化级别是存储序列化数据到内存中,这将在Performance Tuning 章节介绍。更多的信息请看rdd持久化 Spark Programming Guide.

2.8、Checkpointing

一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。

Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括

  • Configuration :创建Spark Streaming应用程序的配置信息
  • DStream operations :定义Streaming应用程序的操作集合
  • Incomplete batches:操作存在队列中的未完成的批

Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于之前 批的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。

何时checkpoint

应用程序在下面两种情况下必须开启checkpoint

  • 使用有状态的transformation。如果在应用程序中用到了 updateStateByKey 或者 reduceByKeyAndWindow ,checkpoint目录必需提供用以定期checkpoint RDD。
  • 从运行应用程序的driver的故障中恢复过来。使用元数据checkpoint恢复处理信息。

注意,没有前述的有状态的transformation的简单流应用程序在运行时可以不开启checkpoint。在这种情况下,从driver故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。 这通常是可以接受的,许多运行的Spark Streaming应用程序都是这种方式。

怎样配置Checkpointing

在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。着可以通过 streamingContext.checkpoint(checkpointDirectory) 方法来做。这运行你用之前介绍的 有状态transformation。另外,如果你想从driver故障中恢复,你应该以下面的方式重写你的Streaming应用程序。

  • 当应用程序是第一次启动,新建一个StreamingContext,启动所有Stream,然后调用 start() 方法
  • 当应用程序因为故障重新启动,它将会从checkpoint目录checkpoint数据重新创建StreamingContext
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

如果 checkpointDirectory 存在,上下文将会利用checkpoint数据重新创建。如果这个目录不存在,将会调用 functionToCreateContext 函数创建一个新的上下文,建立DStreams。 请看RecoverableNetworkWordCount例子。

除了使用 getOrCreate ,开发者必须保证在故障发生时,driver处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。

注意,RDD的checkpointing有存储成本。这会导致批数据(包含的RDD被checkpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)情况下,checkpoint每批数据会显著的减少 操作的吞吐量。相反,checkpointing太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的transformation需要RDD
checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过 dstream.checkpoint 来设置。典型的情况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。

共有 人打赏支持
粉丝 9
博文 79
码字总数 169741
×
Yulong_
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: