文档章节

Scala的foreachRDD

牧师-Panda
 牧师-Panda
发布于 2017/08/23 15:06
字数 816
阅读 305
收藏 0

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

顾名思义是遍历RDD用的,这个函数在DStream包中的InputStream类里,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit): Unit
或者
def foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit

Apply a function to each RDD in this DStream.
This is an output operator, so 'this' DStream will be registered as an 
output stream and therefore materialized.

所以要掌握它,对它要有深入了解。

下面转载一下常见的理解错误

经常写数据到外部系统需要创建一个连接的object handle(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统。程序员可能会想当然地在spark上创建一个connection对象, 然后在spark线程里用这个对象来存RDD:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这个代码会产生执行错误, 因为rdd是分布式存储的,它是一个数据结构,它是一组指向集群数据的指针, rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差,即

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接池对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象, 但在同一时间只有一个connection.send(record)执行, 因为在同一个时间里, 只有 一个微批次的RDD产生出来。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

有人问了个问题,为什么foreachRDD里有两层嵌套的foreach? 为什么dstream.foreachRDD里还要再套一层rdd.foreach

可以这么理解, DStream.foreachRDD 是一个输出操作符,它返回的不是RDD里的一行数据, 而是输出DStream后面的RDD, 在一个时间间隔里, 只返回一个RDD的“微批次”, 为了访问这个“微批次”RDD里的数据, 我们还需要在RDD数据对象上做进一步操作。这也印证了文首的那段英文说明。 参考下面的代码实例, 更容易理解。

给定一个 RDD [Security, Prices]数据结构

     dstream.foreachRDD { pricesRDD =>  // Loop over RDD

       val x= pricesRDD.count

       if (x > 0)  // RDD has data

       {

         for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD

         {

           var index = line._2.split(',').view(0).toInt   // That is the index

           var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source

           var security =  line._2.split(',').view(2).toString // This is the name of the security

           var price = line._2.split(',').view(3).toFloat  // This is the price of the security

           if (price.toFloat > 90.0)

           {

            // Do something here

            // Sent notification, write to HDFS etc

           }

         }

       }

     }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

© 著作权归作者所有

牧师-Panda
粉丝 33
博文 146
码字总数 180044
作品 0
浦东
私信 提问
加载中

评论(0)

基于案例贯通Spark Streaming流计算框架运行源码3

先贴下案例源码 上文已经从源码分析了InputDStream实例化过程,下一步是 源码钻进去 进入FlatMappedDStream.scala 这里顺势看下DStream的继承结构吧。 笔者扫描了一下代码,发现InputDStrea...

柯里昂
2016/05/08
162
0
Spark DStream 输出 编程进阶

5.DStream 输出   输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据 库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStr...

osc_n41kxg36
2019/07/10
2
0
[转载]Spark-Task not serializable错误解析

Spark-Task not serializable错误解析 2018年05月17日 15:33:03 沙拉控 阅读数:1509 在学习SparkStreaming的时候偶然出现的一个问题,先看下面一段代码: 表示任务没有被序列化,那么这个序...

osc_cldb1bbf
2019/04/03
1
0
【Spark篇】---SparkStreaming中算子中OutPutOperator类算子

一、前述 SparkStreaming中的算子分为两类,一类是Transformation类算子,一类是OutPutOperator类算子。 Transformation类算子updateStateByKey,reduceByKeyAndWindow,transform OutPutOpe...

osc_6lj6izs9
2018/03/07
2
0
第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操...

葛晨鑫
2016/05/14
93
0

没有更多内容

加载失败,请刷新页面

加载更多

如何制作出色的R可重现示例 - How to make a great R reproducible example

问题: This post is a Community Wiki . 这篇文章是社区维基 。 Edit existing answers to improve this post. 编辑现有答案以改善此职位。 It is not currently accepting new answers. 它......

技术盛宴
7分钟前
24
0
windows下修改默认mysql编码

查看编码格式: 进入mysql执行下面语句 show variables like '%character%'; 修改编码格式: set character_set_client=utf8;set character_set_connection=utf8;set character_set_da......

珞木橘子
12分钟前
19
0
ArrayList与LinkList性能对比----新增元素

在聊到 ArrayList 和 LinkList 的时候都会这么说 ArrayList 底层是基于数组实现的内存地址物理上是连续的,新增,删除效率低,查询效率高 LinkList 是基于链表实现的,逻辑地址是连续的内存地...

Lbj虞
32分钟前
34
0
Nginx

想了解nginx的代理可以先看这篇: https://baijiahao.baiducom/s?id=1652608869911988442&wfr=spider&for=pc nginx常用命令 nginx -t ##检查配置文件,一般修改完配置文件都建议一定先执行这...

UItraman
昨天
24
0
新基建的福音:智慧楼宇可视化监控系统引领智能化新时代

前言 智慧楼宇和人们的生活息息相关,楼宇智能化程度的提高,会极大程度的改善人们的生活品质,在当前工业互联网大背景下受到很大关注。目前智慧楼宇可视化监控的主要优点包括: 智慧化 -- 智...

xhload3d
昨天
19
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部