文档章节

Scala的foreachRDD

牧师-Panda
 牧师-Panda
发布于 2017/08/23 15:06
字数 816
阅读 22
收藏 0

顾名思义是遍历RDD用的,这个函数在DStream包中的InputStream类里,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit): Unit
或者
def foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit

Apply a function to each RDD in this DStream.
This is an output operator, so 'this' DStream will be registered as an 
output stream and therefore materialized.

所以要掌握它,对它要有深入了解。

下面转载一下常见的理解错误

经常写数据到外部系统需要创建一个连接的object handle(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统。程序员可能会想当然地在spark上创建一个connection对象, 然后在spark线程里用这个对象来存RDD:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这个代码会产生执行错误, 因为rdd是分布式存储的,它是一个数据结构,它是一组指向集群数据的指针, rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差,即

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接池对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象, 但在同一时间只有一个connection.send(record)执行, 因为在同一个时间里, 只有 一个微批次的RDD产生出来。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

有人问了个问题,为什么foreachRDD里有两层嵌套的foreach? 为什么dstream.foreachRDD里还要再套一层rdd.foreach

可以这么理解, DStream.foreachRDD 是一个输出操作符,它返回的不是RDD里的一行数据, 而是输出DStream后面的RDD, 在一个时间间隔里, 只返回一个RDD的“微批次”, 为了访问这个“微批次”RDD里的数据, 我们还需要在RDD数据对象上做进一步操作。这也印证了文首的那段英文说明。 参考下面的代码实例, 更容易理解。

给定一个 RDD [Security, Prices]数据结构

     dstream.foreachRDD { pricesRDD =>  // Loop over RDD

       val x= pricesRDD.count

       if (x > 0)  // RDD has data

       {

         for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD

         {

           var index = line._2.split(',').view(0).toInt   // That is the index

           var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source

           var security =  line._2.split(',').view(2).toString // This is the name of the security

           var price = line._2.split(',').view(3).toFloat  // This is the price of the security

           if (price.toFloat > 90.0)

           {

            // Do something here

            // Sent notification, write to HDFS etc

           }

         }

       }

     }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

© 著作权归作者所有

共有 人打赏支持
牧师-Panda
粉丝 27
博文 146
码字总数 180044
作品 0
浦东
Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

" java.lang.ExceptionInInitializerError at com.test.App$$anonfun$main$3.apply(App.scala:117) at com.test.App$$anonfun$main$3.apply(App.scala:115) at org.apache.spark.streaming.d......

Baclk5
2017/07/21
85
0
基于案例贯通Spark Streaming流计算框架运行源码3

先贴下案例源码 上文已经从源码分析了InputDStream实例化过程,下一步是 源码钻进去 进入FlatMappedDStream.scala 这里顺势看下DStream的继承结构吧。 笔者扫描了一下代码,发现InputDStrea...

柯里昂
2016/05/08
127
0
第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操...

葛晨鑫
2016/05/14
80
0
第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1,Spark Streaming数据清理原因和现象 2,Spark Streaming数据清理代码解析 因为RDD是由DStream产生的,对RDD的操作都是基于对DStream的操作,DStream负责RDD的生命周期。我们一...

葛晨鑫
2016/05/30
69
0
Spark Streaming和Kafka整合之路(最新版本)

最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。 先说一下环境: Spark 2.0.0 kafka_2.11-0.10.0.0 之前的项目当中,已经...

大胖和二胖
2016/09/19
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

qduoj~前端~二次开发~打包docker镜像并上传到阿里云容器镜像仓库

上一篇文章https://my.oschina.net/finchxu/blog/1930017记录了怎么在本地修改前端,现在我要把我的修改添加到部署到本地的前端的docker容器中,然后打包这个容器成为一个本地镜像,然后把这...

虚拟世界的懒猫
今天
1
0
UML中 的各种符号含义

Class Notation A class notation consists of three parts: Class Name The name of the class appears in the first partition. Class Attributes Attributes are shown in the second par......

hutaishi
今天
1
0
20180818 上课截图

小丑鱼00
今天
1
0
Springsecurity之SecurityContextHolderStrategy

注:下面分析的版本是spring-security-4.2.x,源码的github地址是: https://github.com/spring-projects/spring-security/tree/4.2.x 先上一张图: 图1 SecurityContextHolderStrategy的三个......

汉斯-冯-拉特
今天
1
0
LNMP架构(Nginx负载均衡、ssl原理、生成ssl密钥对、Nginx配置ssl)

Nginx负载均衡 网站的访问量越来越大,服务器的服务模式也得进行相应的升级,比如分离出数据库服务器、分离出图片作为单独服务,这些是简单的数据的负载均衡,将压力分散到不同的机器上。有时...

蛋黄_Yolks
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部