文档章节

Scala的foreachRDD

牧师-Panda
 牧师-Panda
发布于 2017/08/23 15:06
字数 816
阅读 26
收藏 0

顾名思义是遍历RDD用的,这个函数在DStream包中的InputStream类里,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit): Unit
或者
def foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit

Apply a function to each RDD in this DStream.
This is an output operator, so 'this' DStream will be registered as an 
output stream and therefore materialized.

所以要掌握它,对它要有深入了解。

下面转载一下常见的理解错误

经常写数据到外部系统需要创建一个连接的object handle(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统。程序员可能会想当然地在spark上创建一个connection对象, 然后在spark线程里用这个对象来存RDD:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这个代码会产生执行错误, 因为rdd是分布式存储的,它是一个数据结构,它是一组指向集群数据的指针, rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差,即

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接池对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象, 但在同一时间只有一个connection.send(record)执行, 因为在同一个时间里, 只有 一个微批次的RDD产生出来。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

有人问了个问题,为什么foreachRDD里有两层嵌套的foreach? 为什么dstream.foreachRDD里还要再套一层rdd.foreach

可以这么理解, DStream.foreachRDD 是一个输出操作符,它返回的不是RDD里的一行数据, 而是输出DStream后面的RDD, 在一个时间间隔里, 只返回一个RDD的“微批次”, 为了访问这个“微批次”RDD里的数据, 我们还需要在RDD数据对象上做进一步操作。这也印证了文首的那段英文说明。 参考下面的代码实例, 更容易理解。

给定一个 RDD [Security, Prices]数据结构

     dstream.foreachRDD { pricesRDD =>  // Loop over RDD

       val x= pricesRDD.count

       if (x > 0)  // RDD has data

       {

         for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD

         {

           var index = line._2.split(',').view(0).toInt   // That is the index

           var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source

           var security =  line._2.split(',').view(2).toString // This is the name of the security

           var price = line._2.split(',').view(3).toFloat  // This is the price of the security

           if (price.toFloat > 90.0)

           {

            // Do something here

            // Sent notification, write to HDFS etc

           }

         }

       }

     }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

© 著作权归作者所有

共有 人打赏支持
牧师-Panda
粉丝 30
博文 146
码字总数 180044
作品 0
浦东
Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

" java.lang.ExceptionInInitializerError at com.test.App$$anonfun$main$3.apply(App.scala:117) at com.test.App$$anonfun$main$3.apply(App.scala:115) at org.apache.spark.streaming.d......

Baclk5
2017/07/21
85
0
基于案例贯通Spark Streaming流计算框架运行源码3

先贴下案例源码 上文已经从源码分析了InputDStream实例化过程,下一步是 源码钻进去 进入FlatMappedDStream.scala 这里顺势看下DStream的继承结构吧。 笔者扫描了一下代码,发现InputDStrea...

柯里昂
2016/05/08
127
0
第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操...

葛晨鑫
2016/05/14
80
0
第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1,Spark Streaming数据清理原因和现象 2,Spark Streaming数据清理代码解析 因为RDD是由DStream产生的,对RDD的操作都是基于对DStream的操作,DStream负责RDD的生命周期。我们一...

葛晨鑫
2016/05/30
69
0
Spark Streaming和Kafka整合之路(最新版本)

最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。 先说一下环境: Spark 2.0.0 kafka_2.11-0.10.0.0 之前的项目当中,已经...

大胖和二胖
2016/09/19
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

mysql 数据类型及占用字节数

数字类型 TINYINT                           1 字节 SMALLINT                          2 个字节 MEDIUMINT                         3 个字节...

会游泳的鱼_
56分钟前
3
0
高性能mysql:创建高性能的索引

性能优化简介 MySQL性能定义为完成某件任务所需要的时间量度,换句话说,性能即响应时间,这是一个非常重要的原则。我们通过任务和时间而不是资源来测量性能。数据库服务器的目的是执行SQL语...

背后的辛酸
今天
6
0
HTTP get、post 中请求json与map传参格式

import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;im......

寒风中的独狼
今天
3
0
IDEA中tomcat启动慢 耗时10分钟

用idea中的tomcat以debug模式启动,会非常的慢,而正常启动没啥问题;原因是debug模式中View Breakpoints断点代码,断点的是jar包,而现在启动由于jar包发生变化,导致启动时一直处于等待中。...

GoodMarver
今天
5
0
Linux学习-10月18(awk)

9.6/9.7 awk 一、awk简介   1. awk是一种编程语言,用于对文本和数据进行处理的   2. 具有强大的文本格式化能力   3. 利用命令awk,可以将一些文本整理成为我们想要的样子   4. 命令awk...

wxy丶
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部