spark2.2官方教程笔记-spark编程向导
博客专区 > skanda 的博客 > 博客详情
spark2.2官方教程笔记-spark编程向导
skanda 发表于4个月前
spark2.2官方教程笔记-spark编程向导
  • 发表于 4个月前
  • 阅读 56
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

摘要: 本文章系列是对阅读spark官方教程过程留下的笔记。

总括

  首先,spark应用程序由一个驱动程序构成,由它运行用户的main函数,并且在集群上执行各种并行化操作。这个抽象的spark应用函数提供弹性分部式数据集【Spark provides is a resilient distributed dataset (RDD)】,一个rdd可以从hadoop文件系统,或者现存的scala集合,或者从其它集合中转换生成。我们可以让rdd保存在内存中,可以让rdd可以高效的做序列化操作。并且RDD还有一个牛逼的功能,就是自动恢复无效的节点。

  其次,spark支持并行化操作中共享变量。当一个spark在不同的节点上运行一系列任务,spark能复制函数中每一个变量到每一个任务节点中。然而,有时一个变量要在不同集群中共享,spark支持两种方式解决这个问题:广播变量(缓存一个数值到所有节点中),累加器(只能用来累加和求和的变量)

链接相关库

  建立spark2.2.0需要依赖相关库,如果需要使用hadoop 集群还需要使用hdfs的库,maven示例

spark core

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

hadoop

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

初始化spark

 spark编程的第一件事就是创建sparkContext对象,该对象告诉spark如何访问集群。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

每个jvm只能实例化一个spark上下文,在创建一个新的sparkContext之前你必须调用stop函数(估计是单例模式?)

appName是显示在集群控制界面Ui上的名称,master是运行spark的模式,有 Spark, Mesos or YARN cluster URL,或者是本地模式“local”,如果是运行集群模式,就不需要以硬编码方式设置master,而是以spark-submit的方式启动程序,并且在集群中接受运行。对于本地测试和单机测试,建议使用设置“local”的方式去在进程中运行spark。

在shell中运行

 在spark shell中,一个sparkContext已经内建好了,变量名是sc,创建的SparkContext是不生效的。

在shell中,可以通过--master参数设置运行模式,--jars添加jar包,--packages添加额外包

$ ./bin/spark-shell --master local[4]
$ ./bin/spark-shell --master local[4] --jars code.jar
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

弹性分布式数据集(RDD)

spark的核心概念是rdd,一个容错的可并行处理的分布式数据集合框架。rdd通过两种方式创建:1,外部数据源(各种集合对象并行化,文件系统,hdfs,hbase等);2,rdd转换。

并行化集合

在程序中现有的集合中调用sparkContext的parallelize(集合对象)方法。举个栗子:创建1到5的并行化集合对象

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

代码很简单,spark官方文档这里絮絮叨叨的讲了一些废话,只有一句是重点,

We describe operations on distributed datasets later on.spark是懒执行,既是遇到action操作的时候才运行程序。

对于并行化集合的一个重要的参数是数据集拆分的数量。spark将在集群上的每个节点运行一个任务。通常集群的cpu数量和分区数一致比较好。

外部数据集

spark能通过以下几种方式创建rdd:本地文件系统,hdfs, Cassandra(一款开源分布式NOSql图形数据库), HBase, Amazon S3等,spark支持文本文件,序列化文件,其它hadoop输入格式(官方文档重复了n次了,估计是spark引以自豪的东西)。

对于文本文件,rdd能够使用 SparkContext 的 textFile 方法创建rdd对象。这个方法参数uri可以是本地路径,hdfs://,s3n://,然后读取其中的每一行。举个栗子,我们可以通过这种方式累加文本文件的行数的长度。

distFile.map(s => s.length).reduce((a, b) => a + b).

一些注意点

  • 如果uri是本地参数,那么集群中的每个节点相同路径下均要能够访问到这个文件,可以使用文件拷贝或者网络mount共享这文件。
  • spark文件输入中,支持输入文件夹,压缩文件和甚至支持通配符。举个栗子
textFile("/my/directory"), 
textFile("/my/directory/*.txt"),
textFile("/my/directory/*.gz")
  • 这里讲textFile(filepath,minPartitions)第二个参数minPartitions,指定数据最小分区。The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.举个栗子
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/"  //local file  
val rdd1 = sc.textFile(path,2)  

从本地系统中读取licenses这个文件夹下的所有文件

這里特别注意的是,比如這个文件夹下有35个文件,上面分区数设置是2,那么整个RDD的分区数是35*2?這是错误的,這个RDD的分区数不管你的partition数设置为多少时,只要license這个文件夹下的這个文件a.txt(比如有a.txt)没有超过128m,那么a.txt就只有一个partition。那么就是说只要这35个文件其中没有一个超过128m,那么分区数就是 35个。

  • SparkContext.wholeTextFiles() 读取路径下的所有文本文件,返回 (filename, content) 对。
  • RDD.saveAsObjectFile and SparkContext.objectFile 将rdd对象序列化后保存。
  • SequenceFiles,文件序列化,使用SparkContext的sequenceFiles【k,v】方法。实际上这是Hadoop的Writable接口的子类,类似的子类有IntWritable和Text。另外,spark允许你对部分Writables进行自定义类型,比如sequenceFile[Int,String]将自动读取IntWritables和Texts。
  • 对于其他Hadoop 输入格式,可以使用SparkContext.hadoopRDD方法。

RDD操作

RDD实现类支持两种类型的操作:Transformations,从另一个RDD转换成一个新的数据集;Action,经过从数据中计算后返回一个新的值。比如 map 函数,传入一个rdd,返回一个新的处理过的rdd。另一方面,reduce是action操作,其对rdd中所有元素经过某种函数处理,返回一个最终结果给驱动程序。

  所有spark的Transformation是懒处理的,其不会立刻进行计算,而是记录需要用于处理的数据集。只有当遇到action操作后才进行计算处理。这种方式让spark运行更高效。

  因此,当你运行一个action的时候每个rdd会被再次计算。然而,通过persist/cache你也能持久化一个rdd在内存中,在你下次查询这个rdd的时候,程序能从集群中快速读取。

   rdd也支持在硬盘或者在多个节点持久化。

基础

  先上一段代码

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

  第一行定义一个来自外部文件的简单rdd对象。这个数据集不会马上加载到内存中,lines几乎只是一个指向文件的指针。第二行定义一个lineLengths作为map转换的结果。所以lineLengths因为懒原则不会立刻处理。最终,我们运行一个reduce,reduce作为action操作,会马上出发程序,将任务下发到不同机器,每个机器运行map的一部分和reduction的一部分,最终返回驱动一个结果。

如果我们会再次使用到lineLengths,建议持久化

lineLengths.persist()

在运行reduce之前,lineLengths在第一次计算后依旧会被保存在内存中数据不会被释放,下次计算时不需要再次出来之前的lines.map(s => s.length)操作。

给spark传函数

spark api在集群中运行过程高度依赖传函数功能。有两种推荐的方法:

  1. 匿名函数,通过短码即可实现。
  2. 全局单例对象的静态方法。
    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    
    myRdd.map(MyFunctions.func1)

     

注意,你也可以传递一个类对象实例山谷的方法(而不是单个对象),不过这会导致传递函数的同时,需要把相应的对象也发送到集群中各个节点上。例如,我们定义一个MyClass如下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

如果我们 new MyClass 创建一个实例,并调用其 doStuff 方法,同时doStuff中的 map引用了该MyClass实例上的 func1 方法,那么接下来,这个MyClass对象将被发送到集群中所有节点上。rdd.map(x => this.func1(x)) 也会有类似的效果。

 

类似地,如果引用外部对象的成员变量,也会导致对整个对象实例的引用:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上面的代码对field的引用等价于 rdd.map(x => this.field + x),这将导致引用整个this对象。

为了避免类似问题,最简单的方式就是,将field固执到一个本地临时变量中,而不是从外部直接访问之,如下:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解闭包

spark中比较需要注意的点是:跨节点执行代码中的变量和方法的作用域和生命周期。在生命周期范围外修改rdd中的数值容易造成错误。在spark单机模式和集群模式下,结果往往不一样。

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

运行结果counter:0

本地模式和集群模式的区别

spark把rdd操作分割成多个独立子操作,每个任务发布给不同计算节点。执行操作前,rdd会计算闭包,把计算所需要的变量和方法副本序列化后发送给每个计算节点。节点上的counter变量和驱动器上的counter变量不是同一个变量。所以两者值不一样。

未解决这类问题,需要使用累加器。累加器是Spark中专门用于集群跨节点分布式执行计算中,安全地更新同一变量的机制。

通常来说,闭包(由循环或本地方法组成),不应该改写全局状态。Spark中改写闭包之外对象的行为是未定义的。这种代码,有可能在本地模式下能正常工作,但这只是偶然情况,同样的代码在分布式模式下其行为很可能不是你想要的。所以,如果需要全局聚合,请记得使用累加器。

打印rdd

在实际编程中,我们经常需要把RDD中的元素打印输出到屏幕上(标准输出stdout),一般会采用语句rdd.foreach(println)或者rdd.map(println)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个RDD中的所有元素。但是,当采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法,比如,rdd.collect().foreach(println),但是,由于collect()方法会把各个worker节点上的所有RDD元素都抓取到Driver Program中,因此,这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,可以采用语句rdd.take(100).foreach(println)。

使用键值对

当大部分spark都可以在任意类型对象上进行rdd操作,然然而也有部分操作只能在键值对上进行。其中最常见的是“shuffle”操作(对应中文是拖动的意思),比如通过key进行分组或聚合元素操作。

Transformations操作

一些通用的rddtransformation函数;

函数名  说明
map

返回一个新的数据集,其中每个元素都是经过func处理后得来

filter 返回一个新的数据集,其中每个元素都是经过func过滤后得来
flatmap 和map相似,但是其中的每个输入元素可能有0或多个输出。
mapPartitions 和map相似,只是map输入对应于每个分区
mapPartitionsWithIndex  
sample(withReplacement, fraction, seed)     采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed)
Union(other Dataset) 返回数据集的并集
intersection(otherDataset) 返回原数据集和参数数据集的交集
distinct([numTasks]) 去重
groupByKey([numTasks]) 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。
注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。
注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。
reduceByKey(func, [numTasks]) 如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。
另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
sortByKey([ascending], [numTasks]) 如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数)
join(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。该算子还有个别名:groupWith
cartesian(otherDataset) 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。
pipe(command[envVars]) 以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。
RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。
coalesce(numPartitions) 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。
repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。
repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。
 

mapPartitions说明

map()的输入函数是应用于RDD中每个元素,而mapPartitions()的输入函数是应用于每个分区


package test

import scala.Iterator

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TestRdd {
  def sumOfEveryPartition(input: Iterator[Int]): Int = {
    var total = 0
    input.foreach { elem =>
      total += elem
    }
    total
  }
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Rdd Test")
    val spark = new SparkContext(conf)
    val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6个元素,分成2个partition
    val result = input.mapPartitions(
      partition => Iterator(sumOfEveryPartition(partition)))//partition是传入的参数,是个list,要求返回也是list,即Iterator(sumOfEveryPartition(partition))
    result.collect().foreach {
      println(_)//6 15
    }
    spark.stop()
  }
}

 

Action

Action函数 作用
reduce(func) 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)
collect() 将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。
count() 返回数据集中元素个数
first() 返回数据集中首个元素(类似于 take(1) )
take(n) 返回数据集中前 个元素
takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。
takeOrdered(n[ordering]) 按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素
saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。
保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。
saveAsSequenceFile(path)
(Java and Scala)
将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)
saveAsObjectFile(path)
(Java and Scala)
将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。
countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。
foreach(func) 在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互操作。
注意:用 foreach 操作出累加器之外的变量可能导致未定义的行为。更详细请参考前面的“理解闭包”(Understanding closures )这一小节。

混洗操作

有一些Spark算子会触发众所周知的混洗(Shuffle)事件。Spark中的混洗机制是用于将数据重新分布,其结果是所有数据将在各个分区间重新分组。一般情况下,混洗需要跨执行器(executor)或跨机器复制数据,这也是混洗操作一般都比较复杂而且开销大的原因。

背景

为了理解混洗阶段都发生了哪些事,我首先以reduceByKey 为例来看一下。reduceByKey会生成一个新的RDD,将源RDD中一个key对应的多个value组合进一个tuple - 然后将这些values输入给reduce函数,得到的result再和key关联放入新的RDD中。这个的难点在于对于某一个key来说,并非其对应的所有values都在同一个分区(partition)中,甚至有可能都不在同一台机器上,但是这些values又必须放到一起计算reduce结果。

在Spark中,通常是由于为了进行某种计算操作,而将数据分布到所需要的各个分区当中。而在计算阶段,单个任务(task)只会操作单个分区中的数据 – 因此,为了组织好每个reduceByKey中reduce任务执行时所需的数据,Spark需要执行一个多对多操作。即,Spark需要读取RDD的所有分区,并找到所有key对应的所有values,然后跨分区传输这些values,并将每个key对应的所有values放到同一分区,以便后续计算各个key对应values的reduce结果 – 这个过程就叫做混洗(Shuffle)。

shuffle开销大。

RDD持久化

rdd持久化分七个级别

Storage Level Meaning
MEMORY_ONLY

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

未序列化java对象存储在jvm内,如果内存不足,部分数据不会存储,且再次使用的过程会重新计算。这是默认级别,且cpu处理器最有效率的方式。

MEMORY_AND_DISK

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

未序列化java对象存储在jvm内,如果内存不足,部分数据存储在硬盘,且再次使用的过程会读取硬盘。

MEMORY_ONLY_SER 
(Java and Scala)

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

以序列化形式存储RDD(每个分区一个字节数组)。通常这种方式比未序列化存储方式要更省空间,尤其是如果你选用了一个比较好的序列化协议(fast serializer),但是这种方式也相应的会消耗更多的CPU来读取数据。

MEMORY_AND_DISK_SER 
(Java and Scala)

Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

和MEMORY_ONLY_SER类似,只是当内存装不下的时候,会将分区的数据吐到磁盘上,而不是每次用到都重新计算。

DISK_ONLY

Store the RDD partitions only on disk.

只存储在磁盘上。这种缓存估计用在处理超大文件的过程。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Same as the levels above, but replicate each partition on two cluster nodes.

和上面没有”_2″的级别相对应,只不过每个分区数据会在两个节点上保存两份副本。

OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

如何选择存储级别?

Spark的存储级别主要可于在内存使用和CPU占用之间做一些权衡。建议根据以下步骤来选择一个合适的存储级别:

  • 如果RDD能使用默认存储级别(MEMORY_ONLY),那就尽量使用默认级别。这是CPU效率最高的方式,所有RDD算子都能以最快的速度运行。
  • 如果步骤1的答案是否(不适用默认级别),那么可以尝试MEMORY_ONLY_SER级别,并选择一个高效的序列化协议(selecting a fast serialization library),这回大大节省数据对象的存储空间,同时速度也还不错。
  • 尽量不要把数据吐到磁盘上,除非:1.你的数据集重新计算的代价很大;2.你的数据集是从一个很大的数据源中过滤得到的结果。否则的话,重算一个分区的速度很可能和从磁盘上读取差不多。
  • 如果需要支持容错,可以考虑使用带副本的存储级别(例如:用Spark来服务web请求)。所有的存储级别都能够以重算丢失数据的方式来提供容错性,但是带副本的存储级别可以让你的应用持续的运行,而不必等待重算丢失的分区。

删除数据

Spark能够自动监控各个节点上缓存使用率,并且以LRU(last recent used)的方式将老数据逐出内存。如果你更喜欢手动控制的话,可以用RDD.unpersist() 方法来删除无用的缓存。

共享变量

一般来说,当spark传递一个函数操作到远程集群节点,驱动程序会把相关的数据以副本形式发送到各个节点。因为跨节点的读写效率太低了,所以一般不会对远程的副本数据进行更新。然而有时候也需要对一些数据进行读写,这就是广播变量和累加器。

广播变量

广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序创建后发送给参与计算的节点。对那些需要让工作节点高效地访问相同数据的应用场景,比如机器学习。我们可以在SparkContext上调用broadcast方法创建广播变量:

  val broadcastList = sc.broadcast(List("Spark","Impala","Hadoop"))

广播变量也可以被非驱动程序所在节点(即工作节点)访问,访问方法就是调用该变量的value方法

  sc.parallelize(List("1","2","3")).map(x => broadcastList.value ++ x).collect

使用广播变量可以优化资源提高性能

广播变量的优势:是因为不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。

广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的

BlockManager中,尝试获取变量副本;如果本地没有,BlockManager,也许会从远程的Driver上面去获取变量副本;也有可能从距离比较近的其他

节点的Executor的BlockManager上去获取,并保存在本地的BlockManager中;BlockManager负责管理某个Executor对应的内存和磁盘上的数据,

此后这个executor上的task,都会直接使用本地的BlockManager中的副本。

例如,50个executor,1000个task。一个map,10M:

默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。

如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的

节点的executor的bockmanager上拉取变量副本,网络传输速度大大增加;500M,大大降低了内存消耗。

标签: Spark Framework
共有 人打赏支持
粉丝 9
博文 75
码字总数 49913
×
skanda
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: