文档章节

Spark函数讲解: combineByKey

大数据之路
 大数据之路
发布于 2012/10/11 16:29
字数 1260
阅读 277
收藏 4

1、背景

在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组、聚合或者将两个包含Pair数据的RDD根据key进行join。从函数的抽象层面看,这些操作具有共同的特征,都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。这种数据处理操作并非单纯的对Pair的value进行map,而是针对不同的key值对原有的value进行联合(Combine)。因而,不仅类型可能不同,元素个数也可能不同。

combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据的类型不同的返回值。

Spark为此提供了一个高度抽象的操作combineByKey。该方法的定义如下所示:

def combineByKey[C](
      //在找到给定分区中第一次碰到的key(在RDD元素中)时被调用。此方法为这个key初始化一个累加器。
      createCombiner: V => C,
      //当累加器已经存在的时候(也就是上面那个key的累加器)调用。
      mergeValue: (C, V) => C,
      // 如果哪个key跨多个分区,该参数就会被调用。
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null
): RDD[(K, C)] = { //实现略 }

函数式风格与命令式风格不同之处在于它说明了代码做了什么(what to do),而不是怎么做(how to do)。combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。

2、原理

由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。

  • 如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。
  • 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
  • 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

3、示例:

让我们来计算每一项科目的平均值

// 关闭 spark-shell INFO/DEBUG 调试信息
scala> sc.setLogLevel("WARN")

scala> val inputrdd = sc.parallelize(Seq(
                        ("maths", 50), ("maths", 60),
                        ("english", 65),
                        ("physics", 66), ("physics", 61), ("physics", 87)), 
                        1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> inputrdd.getNumPartitions                      
res55: Int = 1

scala> val reduced = inputrdd.combineByKey(
         (mark) => {
           println(s"Create combiner -> ${mark}")
           (mark, 1)
         },
         (acc: (Int, Int), v) => {
           println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
           (acc._1 + v, acc._2 + 1)
         },
         (acc1: (Int, Int), acc2: (Int, Int)) => {
           println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
           (acc1._1 + acc2._1, acc1._2 + acc2._2)
         }
     )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29

scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))

scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31

scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))

注意:本例中因为只有一个分区所以 mergeCombiners 并没有用到,你也可以通过下面的代码从另外角度来验证:

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21

scala> rdd1.getNumPartitions    
res18: Int = 64 

scala> rdd1.combineByKey(
            (v : Int) => v + "_",   
            (c : String, v : Int) => c + "@" + v,  
            (c1 : String, c2 : String) => c1 + "$" + c2
            ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

在此例中,因为分区多而记录少,可以看做每条记录都跨分区了,所以没有机会用到 mergeValue,最后直接 mergeCombiners  得到结果 

除了可以进行group、average之外,根据传入的函数实现不同,我们还可以利用combineByKey完成诸如aggregate、fold等操作。这是一个高度的抽象,但从声明的角度来看,却又不需要了解过多的实现细节。这正是函数式编程的魅力。

Refer:

[1] Spark函数讲解:combineByKey

http://bihell.com/2017/03/14/Combiner-in-Pair-RDDs-combineByKey/

[2] combineByKey操作

https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/combinebykey.html

[3] Spark算子执行流程详解之五

http://blog.csdn.net/wl044090432/article/details/59483319

[4] Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey

http://lxw1234.com/archives/2015/07/358.htm

© 著作权归作者所有

大数据之路
粉丝 1601
博文 515
码字总数 335917
作品 0
武汉
架构师
私信 提问
用Spark解决一些经典MapReduce问题

摘要 Spark是一个Apache项目,它被标榜为“快如闪电的集群计算”。它拥有一个繁荣的开源社区,并且是目前最活跃的Apache项目。Spark提供了一个更快、更通用的数据处理平台。和Hadoop相比,S...

力谱宿云
2016/12/01
507
0
Spark中的键值对操作 JavaPairRDD

1.PairRDD介绍 List list=new ArrayList(); list.add("this is a test"); list.add("how are you?"); list.add("do you love me?"); list.add("can you tell me?"); JavaRDD lines=sc.paral......

u014236541
2018/05/08
0
0
大数据(Spark-基本概念)

Spark简介 Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。目前已经形成一个高速发展应用广泛 的生态系统。 Apache Spark特性: 1,快速 大多数操作均在内存中迭代,只有少部...

这很耳东先生
07/09
30
0
[Kafka与Spark集成系列一] Spark入门

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/82081946 Spark是一个用来是实现快速而通用的集群计算的平台。Spark是UC ...

朱小厮
2018/08/26
0
0
影响Spark输出RDD分区的操作函数

会影响到Spark输出RDD分区(partitioner)的操作 cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, partitionBy, sort, mapValues(如果父......

imbetter
2018/04/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
712
10
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
26
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
75
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
73
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部