文档章节

spark streaming 处理空batch

 张欢19933
发布于 2017/05/18 20:48
字数 501
阅读 40
收藏 0

Spark Streaming是近实时(near real time)的小批处理系统。对给定的时间间隔(interval),Spark Streaming生成新的batch并对它进行一些处理。每个batch中的数据都代表一个RDD,但是如果一些batch中没有数据会发生什么事情呢?Spark Streaming将会产生EmptyRDD的RDD,它的定义如下:

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Partition, SparkContext, TaskContext}

/**
 * An RDD that has no partitions and no elements.
 */
private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

  override def getPartitions: Array[Partition] = Array.empty

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    throw new UnsupportedOperationException("empty RDD")
  }
}

可以看到这个RDD并不对任何父RDD有依赖关系,我们不能调用compute方法计算每个分区的数据。EmptyRDD的存在是为了保证Spark Streaming中多个batch的处理是一致的。但是存在EmptyRDD有时候会产生一些问题,比如:如果你想将接收到的Streaming数据写入HDFS中:

val ssc = new StreamingContext(args(0),"iteblog",Seconds(10))
val socketStream = ssc.socketTextStream("www.iteblog.com",8888)
val outputDir = args(1)

socketStream.foreachRDD(rdd => {
  rdd.saveAsTextFile(outputDir)
})

当你调用foreachRDD的时候如果当前rdd是EmptyRDD,这样会导致在HDFS上生成大量的空文件!这肯定不是我们想要的,我们只想在存在数据的时候才写HDFS,我们可以通过以下的两种方法来避免这种情况:

socketStream.foreachRDD(rdd => {
  if(rdd.count() != 0){
  	rdd.saveAsTextFile(outputDir)
  }
})

EmptyRDD的count肯定是0,所以这样可以避免写空文件,或者我们也可以用下面方法解决:

socketStream.foreachRDD(rdd => {
  if(!rdd.partitions.isEmpty){
  	rdd.saveAsTextFile(outputDir)
  }
})

EmptyRDD是没有分区的,所以调用partitions.isEmpty是true。这样也可以解决上述问题。

虽然上面两种方法都可以解决这个问题,但是推荐使用第二种方法。因为第一种方法调用了RDD的count函数,这是一个Action,会触发一次Job的计算,当你的数据量比较大的时候,这可能会带来性能方面的一些影响;而partitions.isEmpty是不需要触发Job的。

本文转载自:https://www.iteblog.com/archives/1304.html

粉丝 47
博文 532
码字总数 244932
作品 0
海淀
私信 提问
Spark(五) -- Spark Streaming介绍与基本执行过程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45770881 Spark Streaming作为Spark上的四大子框架之一,肩负着实时流计算的重...

jchubby
2015/05/16
0
0
Spark Streaming 反压(Back Pressure)机制介绍

文章目录 1 背景 2 反压机制 3 Spark Streaming 反压机制的使用 背景 在默认情况下,Spark Streaming 通过 receivers (或者是 Direct 方式) 以生产者生产数据的速率接收数据。当 batch proc...

Spark
2018/05/28
0
0
7.Spark Streaming

Spark Streaming是Spark核心api的一个拓展,可以实现高吞吐量/具备容错机制的实时流数据的处理 Spark Streaming 与 Spark Core 的关系可以用下面的经典部件图来表述: 基于Spark做Spark Str...

山间浓雾有路灯
06/04
0
0
详解Spark Streaming的Graceful Shutdown

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zwgdft/article/details/85849153   对于Spark Streaming程序而言,一旦运行起来后,就会不断的从数据流中消...

Mr-Bruce
01/06
0
0
Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名
2018/04/29
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 年迈渔夫遭黑帮袭抢

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享Elvis Presley的单曲《White Christmas》: 《White Christmas》- Elvis Presley 手机党少年们想听歌,请使劲...

小小编辑
今天
1K
20
CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
16
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部