文档章节

Scala的foreachRDD

牧师-Panda
 牧师-Panda
发布于 2017/08/23 15:06
字数 816
阅读 19
收藏 0
点赞 0
评论 0

顾名思义是遍历RDD用的,这个函数在DStream包中的InputStream类里,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit): Unit
或者
def foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit

Apply a function to each RDD in this DStream.
This is an output operator, so 'this' DStream will be registered as an 
output stream and therefore materialized.

所以要掌握它,对它要有深入了解。

下面转载一下常见的理解错误

经常写数据到外部系统需要创建一个连接的object handle(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统。程序员可能会想当然地在spark上创建一个connection对象, 然后在spark线程里用这个对象来存RDD:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这个代码会产生执行错误, 因为rdd是分布式存储的,它是一个数据结构,它是一组指向集群数据的指针, rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差,即

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接池对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象, 但在同一时间只有一个connection.send(record)执行, 因为在同一个时间里, 只有 一个微批次的RDD产生出来。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

有人问了个问题,为什么foreachRDD里有两层嵌套的foreach? 为什么dstream.foreachRDD里还要再套一层rdd.foreach

可以这么理解, DStream.foreachRDD 是一个输出操作符,它返回的不是RDD里的一行数据, 而是输出DStream后面的RDD, 在一个时间间隔里, 只返回一个RDD的“微批次”, 为了访问这个“微批次”RDD里的数据, 我们还需要在RDD数据对象上做进一步操作。这也印证了文首的那段英文说明。 参考下面的代码实例, 更容易理解。

给定一个 RDD [Security, Prices]数据结构

     dstream.foreachRDD { pricesRDD =>  // Loop over RDD

       val x= pricesRDD.count

       if (x > 0)  // RDD has data

       {

         for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD

         {

           var index = line._2.split(',').view(0).toInt   // That is the index

           var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source

           var security =  line._2.split(',').view(2).toString // This is the name of the security

           var price = line._2.split(',').view(3).toFloat  // This is the price of the security

           if (price.toFloat > 90.0)

           {

            // Do something here

            // Sent notification, write to HDFS etc

           }

         }

       }

     }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

© 著作权归作者所有

共有 人打赏支持
牧师-Panda
粉丝 26
博文 146
码字总数 180044
作品 0
浦东
Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

" java.lang.ExceptionInInitializerError at com.test.App$$anonfun$main$3.apply(App.scala:117) at com.test.App$$anonfun$main$3.apply(App.scala:115) at org.apache.spark.streaming.d......

Baclk5 ⋅ 2017/07/21 ⋅ 0

基于案例贯通Spark Streaming流计算框架运行源码3

先贴下案例源码 上文已经从源码分析了InputDStream实例化过程,下一步是 源码钻进去 进入FlatMappedDStream.scala 这里顺势看下DStream的继承结构吧。 笔者扫描了一下代码,发现InputDStrea...

柯里昂 ⋅ 2016/05/08 ⋅ 0

第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操...

葛晨鑫 ⋅ 2016/05/14 ⋅ 0

Spark Streaming和Kafka整合之路(最新版本)

最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。 先说一下环境: Spark 2.0.0 kafka_2.11-0.10.0.0 之前的项目当中,已经...

大胖和二胖 ⋅ 2016/09/19 ⋅ 1

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1,Spark Streaming数据清理原因和现象 2,Spark Streaming数据清理代码解析 因为RDD是由DStream产生的,对RDD的操作都是基于对DStream的操作,DStream负责RDD的生命周期。我们一...

葛晨鑫 ⋅ 2016/05/30 ⋅ 0

Spark Streaming 批量写入HBase

val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.textFileStream("hdfs://master:9000/woozoom/").repartition(12).map(_.split(",")) lines.foreachRDD { rdd => { if ......

大胖和二胖 ⋅ 2016/09/05 ⋅ 0

新手福利:Apache Spark 入门攻略

【编者按】时至今日,Spark 已成为大数据领域最火的一个开源项目,具备高性能、易于使用等特性。然而作为一个年轻的开源项目,其使用上存在的挑战亦不可为不大,这里为大家分享 SciSpike 软件...

OneAPM蓝海讯通 ⋅ 2015/07/15 ⋅ 0

用实例讲解Spark Sreaming--转

本篇文章用Spark Streaming +Hbase为列,Spark Streaming专为流式数据处理,对Spark核心API进行了相应的扩展。 什么是Spark Streaming? 首先,什么是流式处理呢?数据流是一个数据持续不断...

Zero零_度 ⋅ 2016/12/22 ⋅ 0

spark streaming中建立线程池

为了避免延迟以及更快速的处理业务,在spark streaming中建立线程池,避免每条信息等待处理,代码大致如下 stream.foreachRDD(rdd=> { rdd.foreachPartition { rddPartition => { val clien...

木木木yanyanyan ⋅ 2017/11/13 ⋅ 0

Spark Streaming 编程指南[中]

基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.html Streaming应用实战,参考:http://my.oschina.net/u/230......

openthings ⋅ 2016/07/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Cube、Cuboid 和 Cube Segment

1.Cube (或Data Cube),即数据立方体,是一种常用于数据分析与索引的技术;它可以对原始数据建立多维度索引。通过 Cube 对数据进行分析,可以大大加快数据的查询效率 2.Cuboid 在 Kylin 中特...

无精疯 ⋅ 27分钟前 ⋅ 0

github太慢

1:用浏览器访问 IPAddress.com or http://tool.chinaz.com 使用 IP Lookup 工具获得github.com和github.global.ssl.fastly.net域名的ip地址 2:/etc/hosts文件中添加如下格式(IP最好自己查一...

whoisliang ⋅ 28分钟前 ⋅ 0

非阻塞同步之 CAS

为解决线程安全问题,互斥同步相当于以时间换空间。多线程情况下,只有一个线程可以访问同步代码。这种同步也叫阻塞同步(Blocking Synchronization). 这种同步属于一种悲观并发策略。认为只...

长安一梦 ⋅ 39分钟前 ⋅ 0

云计算的选择悖论如何对待?

人们都希望在工作和生活中有所选择。但心理学家的调查研究表明,在多种选项中进行选择并不一定会使人们更快乐,甚至不会产生更好的决策。心理学家Barry Schwartz称之为“选择悖论”。云计算为...

linux-tao ⋅ 41分钟前 ⋅ 0

我的第一篇个人博客

虽然这是个技术博客,但是,我总是想写一些自己的东西,所有就大胆的在这里写下了第一篇非技术博客。技术博客也很久没有更新,个人原因。 以后自己打算在这里写一些非技术博客,可能个人观点...

Mrs_CoCo ⋅ 42分钟前 ⋅ 0

Redis 注册为 Windows 服务

Redis 注册为 Windows 服务 redis 注册为 windows 服务相关命令 注册服务 redis-server.exe –service-install redis.windows.conf 删除服务 redis-server –service-uninstall 启动服务 re......

Os_yxguang ⋅ 42分钟前 ⋅ 0

世界那么大,语言那么多,为什么选择Micropython,它的优势在哪?

最近国内MicroPython风靡程序界,是什么原因导致它这么火呢?是因为他功能强大,遵循Mit协议开源么? 错!因为使用它真的是太舒服了!!! Micropython的由来,这得益于Damien George这位伟大...

bodasisiter ⋅ 46分钟前 ⋅ 0

docker 清理总结

杀死所有正在运行的容器 docker kill $(docker ps -a -q) 删除所有已经停止的容器(docker rm没有加-f参数,运行中的容器不会删掉) docker rm $(docker ps -a -q) 删除所有未打 dangling 标...

vvx1024 ⋅ 56分钟前 ⋅ 0

关于学习

以前学车的时候,教练说了这样的一句话:如果一个人坐在车上一直学,一直学,反而不如大家轮流着学。因为一个人一直学,就没有给自己留空间来反思和改进。而轮流着学的时候大家下来之后思考上...

mskk ⋅ 今天 ⋅ 0

压缩工具之gzip-bzip2-xz

win下常见压缩工具:rar zip 7z linux下常见压缩工具:zip gz bz2 xz tar.gz tar.bz2 tar.xz gzip 不支持目录压缩 gzip 1.txt #压缩。执行后1.txt消失,生成1.txt.gz压缩文件 gzip -d 1.txt....

ZHENG-JY ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部