# Spark RDD 常用算子

2016/12/20 20:19

### map、flatMap、mapValues和flatMapValues

`````` /**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

/**
*  Return a new RDD by first applying a function to all elements of this
*  RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}

/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
cleanF(v).map(x => (k, x))
},
preservesPartitioning = true)
}
``````

``````//加载文件
scala> val data = sc.textFile("file:///usr/home/dw/yzxJar/rdd.txt")
data: org.apache.spark.rdd.RDD[String] = file:///usr/home/dw/yzxJar/rdd.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> data.foreach(println)
hello hbase
hello world
hello spark
hello java
hello hive

scala> data.map(_.toUpperCase).collect
res1: Array[String] = Array(HELLO WORLD, HELLO JAVA, HELLO HADOOP, HELLO HIVE, HELLO HBASE, HELLO SPARK, HELLO FLINK)

//flatMap会将字符串看成是一个字符数组
scala> data.flatMap(_.toUpperCase).collect
res2: Array[Char] = Array(H, E, L, L, O,  , W, O, R, L, D, H, E, L, L, O,  , J, A, V, A, H, E, L, L, O,  , H, A, D, O, O, P, H, E, L, L, O,  , H, I, V, E, H, E, L, L, O,  , H, B, A, S, E, H, E, L, L, O,  , S, P, A, R, K, H, E, L, L, O,  , F, L, I, N, K)

scala> data.map(x => x.split("\\s+")).collect
res3: Array[Array[String]] = Array(Array(hello, world), Array(hello, java), Array(hello, hadoop), Array(hello, hive), Array(hello, hbase), Array(hello, spark), Array(hello, flink))

scala> data.flatMap(x => x.split("\\s+")).collect
res4: Array[String] = Array(hello, world, hello, java, hello, hadoop, hello, hive, hello, hbase, hello, spark, hello, flink)

scala> data.map(x => x.split("\\s+")).map(x => (x.length, x)).mapValues(x => x.map(_ + "_")).collect
res5: Array[(Int, Array[String])] = Array((2,Array(hello_, world_)), (2,Array(hello_, java_)), (2,Array(hello_, hadoop_)), (2,Array(hello_, hive_)), (2,Array(hello_, hbase_)), (2,Array(hello_, spark_)), (2,Array(hello_, flink_)))

scala> data.map(x => x.split("\\s+")).map(x => (x.length, x)).flatMapValues(x => x.map(_ + "_")).collect
res6: Array[(Int, String)] = Array((2,hello_), (2,world_), (2,hello_), (2,java_), (2,hello_), (2,hadoop_), (2,hello_), (2,hive_), (2,hello_), (2,hbase_), (2,hello_), (2,spark_), (2,hello_), (2,flink_))
``````

### mapPartitions和mapPartitionsWithIndex

``````/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
``````

``````scala> val rdd1 = sc.makeRDD(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

//rdd2将rdd1中每个分区中的数值累加
scala> val rdd2 = rdd1.mapPartitions{iterator =>
|       val result = List[Int]()
|       var i = 0
|       while(iterator.hasNext) { i += iterator.next() }
|       result.::(i).iterator
|     }
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at mapPartitions at <console>:26

scala> rdd2.foreach(println)
24
15
6

scala> rdd2.collect
res1: Array[Int] = Array(6, 15, 24)

scala> rdd2.getNumPartitions
res2: Int = 3

scala> rdd2.partitioner
res3: Option[org.apache.spark.Partitioner] = None

scala> val rdd3 = rdd1.mapPartitionsWithIndex{(index, iterator) =>
|       val result = List[String]()
|       var i = 0
|       while(iterator.hasNext) { i += iterator.next() }
|       result.::(index+"|"+i).iterator
|     }
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26

scala> rdd3.foreach(println)
0|6
1|15
2|24

scala> rdd3.collect
res5: Array[String] = Array(0|6, 1|15, 2|24)

scala> rdd3.getNumPartitions
res6: Int = 3

scala> rdd3.partitioner
res7: Option[org.apache.spark.Partitioner] = None
``````

### zipWithIndex和zipWithUniqueId

``````/**
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
*
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*
* @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
* elements in a partition. The index assigned to each element is therefore not guaranteed,
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithIndex(): RDD[(T, Long)] = withScope {
new ZippedWithIndexRDD(this)
}

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*
* @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
* elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
(item, i * n + k)
}
}
}
``````

``````scala> val x = sc.parallelize(100 to 120, 5)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> x.zipWithIndex.collect
res0: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20))

scala> x.zipWithUniqueId.collect
res1: Array[(Int, Long)] = Array((100,0), (101,5), (102,10), (103,15), (104,1), (105,6), (106,11), (107,16), (108,2), (109,7), (110,12), (111,17), (112,3), (113,8), (114,13), (115,18), (116,4), (117,9), (118,14), (119,19), (120,24))
``````

### zip和zipPartitions

``````/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +
"same number of elements in each partition")
}
def next(): (T, U) = (thisIter.next(), otherIter.next())
}
}
}

/**
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
* applying a function to the zipped partitions. Assumes that all the RDDs have the
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
}

def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, preservesPartitioning = false)(f)
}

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
}

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
}

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
}

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
}
``````

``````scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Seq("A","B","C","D","E","F","G","H","I","J"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E), (6,F), (7,G), (8,H), (9,I), (10,J))

scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5), (F,6), (G,7), (H,8), (I,9), (J,10))

scala> val rdd3 = sc.makeRDD(Seq("A","B","C","D","E","F","G","H","I","J"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at makeRDD at <console>:24

scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3)

scala> val rdd4 = sc.makeRDD(Seq("A","B","C","D","E","F","G","H","I"),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at makeRDD at <console>:24

scala> rdd1.zip(rdd4).collect
16/12/22 20:31:22 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4)
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

scala> val a = sc.parallelize(0 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val b = sc.parallelize(10 to 19, 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> val c = sc.parallelize(100 to 109, 3)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> a.zipPartitions(b, c){(aiter, biter, citer) =>
|   var res = List[String]()
|   while (aiter.hasNext && biter.hasNext && citer.hasNext) {
|     val x = aiter.next + " " + biter.next + " " + citer.next
|     res ::= x
|   }
|   res.iterator
| }.collect
res4: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)
``````

### randomSplit

`````` /**
* Randomly splits this RDD with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1
* @param seed random seed
*
* @return split RDDs in an array
*/
def randomSplit(
weights: Array[Double],
seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
require(weights.forall(_ >= 0),
s"Weights must be nonnegative, but got \${weights.mkString("[", ",", "]")}")
require(weights.sum > 0,
s"Sum of weights must be positive, but got \${weights.mkString("[", ",", "]")}")

withScope {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
randomSampleWithRange(x(0), x(1), seed)
}.toArray
}
}
``````

``````scala> val rdd = sc.makeRDD(1 to 10, 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

//把原来的rdd按照权重1.0,2.0,3.0,4.0，随机划分到这4个RDD中，权重高的RDD，划分到的几率就大一些。注意，权重的总和加起来为1，否则会不正常。
scala> val splitRDD = rdd.randomSplit(Array(1.0, 2.0, 3.0, 4.0))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(
MapPartitionsRDD[1] at randomSplit at <console>:26,
MapPartitionsRDD[2] at randomSplit at <console>:26,
MapPartitionsRDD[3] at randomSplit at <console>:26,
MapPartitionsRDD[4] at randomSplit at <console>:26)

scala> splitRDD.size
res1: Int = 4

scala> splitRDD(0).collect
res2: Array[Int] = Array(8)

scala> splitRDD(1).collect
res3: Array[Int] = Array(9, 10)

scala> splitRDD(2).collect
res4: Array[Int] = Array(1, 3, 4, 5)

scala> splitRDD(3).collect
res5: Array[Int] = Array(2, 6, 7)
``````

### glom

``````  /**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
``````

``````scala> val rdd = sc.makeRDD(1 to 10, 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24

scala> rdd.partitions.size
res6: Int = 3

//glom将每个分区中的元素放到一个数组中，这样，结果就变成了3个数组
scala> rdd.glom().collect
res7: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
``````

###join、leftOuterJoin、rightOuterJoin和fullOuterJoin

``````/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}

/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def leftOuterJoin[W](
other: RDD[(K, W)],
partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => (v, None))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}

/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.iterator.map(w => (None, w))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope {
fullOuterJoin(other, defaultPartitioner(self, other))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}
``````

``````scala> val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd1.join(rdd2).collect
res0: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))

scala> rdd1.join(rdd2).foreach(println)
(C,(3,c))
(A,(1,a))

scala> rdd2.join(rdd1).collect
res3: Array[(String, (String, String))] = Array((A,(a,1)), (C,(c,3)))

scala> rdd2.join(rdd1).foreach(println)
(C,(c,3))
(A,(a,1))

scala> rdd1.leftOuterJoin(rdd2).foreach(println)
(A,(1,Some(a)))
(B,(2,None))
(C,(3,Some(c)))

scala> rdd2.leftOuterJoin(rdd1).foreach(println)
(D,(d,None))
(A,(a,Some(1)))
(C,(c,Some(3)))

scala> rdd1.rightOuterJoin(rdd2).foreach(println)
(C,(Some(3),c))
(A,(Some(1),a))
(D,(None,d))

scala> rdd2.rightOuterJoin(rdd1).foreach(println)
(B,(None,2))
(A,(Some(a),1))
(C,(Some(c),3))

scala> rdd1.fullOuterJoin(rdd2).foreach(println)
(B,(Some(2),None))
(A,(Some(1),Some(a)))
(C,(Some(3),Some(c)))
(D,(None,Some(d)))

scala> rdd1.fullOuterJoin(rdd2).collect
res6: Array[(String, (Option[String], Option[String]))] = Array(
(B,(Some(2),None)),
(D,(None,Some(d))),
(A,(Some(1),Some(a))),
(C,(Some(3),Some(c))))
``````

### cogroup和groupwith

`````` /**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
}
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
}

/** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}

/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/** Alias for cogroup. */
def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}
``````

cogroup类似于SQL中的全外关联full outer join，返回左右RDD中的记录，关联不上的为空。 参数numPartitions用于指定结果的分区数。参数partitioner用于指定分区函数。 代码实战如下：

``````scala> val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24

//参数为一个rdd
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[3] at cogroup at <console>:28

scala> rdd3.partitions.size
res0: Int = 2

scala> rdd3.collect
res1: Array[(String, (Iterable[String], Iterable[String]))] = Array(
(B,(CompactBuffer(2),CompactBuffer())),
(D,(CompactBuffer(),CompactBuffer(d))),
(A,(CompactBuffer(1),CompactBuffer(a))),
(C,(CompactBuffer(3),CompactBuffer(c))))

scala> val rdd4 =  sc.makeRDD(Array(("A","A"),("E","E")),2)
rdd4: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[4] at makeRDD at <console>:24

//参数为两个rdd
scala> val rdd5 = rdd1.cogroup(rdd2, rdd4)
rdd5: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[8] at cogroup at <console>:30

scala> rdd5.foreach(println)
(B,(CompactBuffer(2),CompactBuffer(),CompactBuffer()))
(D,(CompactBuffer(),CompactBuffer(d),CompactBuffer()))
(A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A)))
(C,(CompactBuffer(3),CompactBuffer(c),CompactBuffer()))
(E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E)))

//参数为三个rdd
scala> val rdd6 = sc.makeRDD(Array(("B", "h"), ("D", "e")), 2)
rdd6: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[9] at makeRDD at <console>:24

scala> val rdd7 = rdd1.cogroup(rdd2, rdd4, rdd6)
rdd7: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[11] at cogroup at <console>:32

scala> rdd7.foreach(println)
(B,(CompactBuffer(2),CompactBuffer(),CompactBuffer(),CompactBuffer(h)))
(D,(CompactBuffer(),CompactBuffer(d),CompactBuffer(),CompactBuffer(e)))
(A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A),CompactBuffer()))
(C,(CompactBuffer(3),CompactBuffer(c),CompactBuffer(),CompactBuffer()))
(E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E),CompactBuffer()))
``````

### combineByKey

``````  /**
* :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
*
* Users provide three functions:
*
*  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
*  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
*  - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*
* @note V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]).
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}

/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}

createCombiner：组合器函数，用于将V类型转换成C类型，输入参数为RDD[K,V]中的V,输出为C
mergeValue：合并值函数，将一个C类型和一个V类型值合并成一个C类型，输入参数为(C,V)，输出为C
mergeCombiners：合并组合器函数，用于将两个C类型值合并成一个C类型，输入参数为(C,C)，输出为C
numPartitions：结果RDD分区数，默认保持原有的分区数
partitioner：分区函数,默认为HashPartitioner
mapSideCombine：是否需要在Map端进行combine操作，类似于MapReduce中的combine，默认为true

/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
* This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}

/**
* :: Experimental ::
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
new HashPartitioner(numPartitions))
}

/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level. This method is here for backward compatibility. It
* does not provide combiner classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

/**
* :: Experimental ::
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
``````

``````scala> val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[2] at zip at <console>:28

scala> val d = c.combineByKey(x => List(x), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[3] at combineByKey at <console>:30

scala> d.foreach(println)
(1,List(cat, dog, turkey))
(2,List(gnu, rabbit, salmon, bee, bear, wolf))

scala> val rdd1 = sc.makeRDD(Array(("A",1),("A",2),("A",3),("A",4),("A",5), ("B",1),("B",2),("B",3),("C",1), ("C",4)), 4)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at makeRDD at <console>:24

scala> rdd1.combineByKey((v: Int) => v + "_", (c: String, v: Int) => c + "@" + v, (c1: String, c2: String) => c1 + "\$" + c2).foreach(println)
(B,1_@2\$3_)
(C,1_@4)
(A,1_@2\$3_@4@5)
``````

### groupByKey

``````/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*
* @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*
* @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements
* within each group is not guaranteed, and may even differ each time the resulting RDD is
* evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*/
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
``````

``````scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1),("C", 3), ("D", 5)), 4)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> rdd1.groupByKey().collect
res4: Array[(String, Iterable[Int])] = Array(
(D,CompactBuffer(5)),
(A,CompactBuffer(0, 2)),
(B,CompactBuffer(1, 2)),
(C,CompactBuffer(1, 3)))

scala> rdd1.partitions.size
res5: Int = 4
``````

### reduceByKey

``````/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}

/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
``````

``````scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1),("C", 3), ("D", 5)), 4)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> val rdd2 = rdd1.reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:26

scala> rdd2.partitions.size
res6: Int = 4

scala> rdd2.collect
res7: Array[(String, Int)] = Array((D,5), (A,2), (B,3), (C,4))
``````

### reduceByKeyLocally

``````/**
* Merge the values for each key using an associative and commutative reduce function, but return
* the results immediately to the master as a Map. This will also perform the merging locally on
* each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
val cleanedF = self.sparkContext.clean(func)

if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
}

val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.asScala.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
m1
} : JHashMap[K, V]

self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
}
``````

``````scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1),("C", 3), ("D", 5)), 4)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> rdd1.reduceByKeyLocally((x,y) => x + y)
res8: scala.collection.Map[String,Int] = Map(D -> 5, A -> 2, B -> 3, C -> 4)
``````

### aggregate和fold

``````  /**
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*
* @param zeroValue the initial value for the accumulated result of each partition for the
*                  `seqOp` operator, and also the initial value for the combine results from
*                  different partitions for the `combOp` operator - this will typically be the
*                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}

//aggregate用户聚合RDD中的元素，先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型，再使用combOp将之前每个分区聚合后的U类型聚合成U类型，特别注意seqOp和combOp都会使用zeroValue的值，zeroValue的类型为U。

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
* allocation; however, it should not modify t2.
*
* This behaves somewhat differently from fold operations implemented for non-distributed
* collections in functional languages like Scala. This fold operation may be applied to
* partitions individually, and then fold those results into the final result, rather than
* apply the fold to each element sequentially in some defined ordering. For functions
* that are not commutative, the result may differ from that of a fold applied to a
* non-distributed collection.
*
* @param zeroValue the initial value for the accumulated result of each partition for the `op`
*                  operator, and also the initial value for the combine results from different
*                  partitions for the `op` operator - this will typically be the neutral
*                  element (e.g. `Nil` for list concatenation or `0` for summation)
* @param op an operator used to both accumulate results within a partition and combine results
*                  from different partitions
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}

//fold是aggregate的简化，将aggregate中的seqOp和combOp使用同一个函数op。
``````

``````scala> val z = sc.parallelize(List("a","b","c","d","e","f"),3)
z: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> z.mapPartitionsWithIndex{(index, iterator) =>
|       iterator.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
|   }.foreach(println)
[partID:0, val: a]
[partID:0, val: b]
[partID:2, val: e]
[partID:2, val: f]
[partID:1, val: c]
[partID:1, val: d]

scala> z.aggregate("")(_+_, _+_)
res1: String = cdabef

scala> z.aggregate("x")(_+_, _+_)
res2: String = xxabxcdxef

scala> z.aggregate("")(_+_, _+_)
res3: String = abcdef

scala> val a = sc.parallelize(1 to 100)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> a.fold(0)(_ + _)
res4: Int = 5050

scala> a.aggregate(0)(_+_, _+_)
res5: Int = 5050
``````

### aggregateByKey和foldByKey

``````  /**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)

lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}

/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)

// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}

/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}

/**
* Merge the values for each key using an associative function and a neutral "zero value" which
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}
``````

``````scala> val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[16] at map at <console>:26

scala> b.mapPartitionsWithIndex{(index, iterator) => iterator.toList.map(x => "[partID:"+index+", val: "+x+"]").iterator }.foreach(println)
[partID:2, val: (3,gnu)]
[partID:2, val: (3,ant)]
[partID:0, val: (3,dog)]
[partID:1, val: (3,cat)]
[partID:1, val: (3,owl)]

scala> b.foldByKey("HE")(_ + _).foreach(println)
(3,HEdogHEcatowlHEgnuant)

scala> b.aggregateByKey("HE")(_+_, _+_).foreach(println)
(3,HEdogHEcatowlHEgnuant)
``````

### countByKey和countByValue

`````` /**
* Count the number of elements for each key, collecting the results to a local Map.
*
* @note This method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

/**
* Return the count of each unique value in this RDD as a local map of (value, count) pairs.
*
* @note This method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using
*
* {{{
* rdd.map(x => (x, 1L)).reduceByKey(_ + _)
* }}}
*
* , which returns an RDD[T, Long] instead of a map.
*/
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
map(value => (value, null)).countByKey()
}
``````

``````scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> b.countByValue()
res0: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)

scala> val c = sc.parallelize(List((3, "Gnu"), (3, "Gnu"), (3, "Yak"), (3, "Yak"), (3, "Yak"), (5, "Mouse"), (5, "Mouse"), (3, "Dog")))
c: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> c.countByKey()
res1: scala.collection.Map[Int,Long] = Map(5 -> 2, 3 -> 6)

scala> c.countByValue()
res2: scala.collection.Map[(Int, String),Long] = Map((5,Mouse) -> 2, (3,Yak) -> 3, (3,Gnu) -> 2, (3,Dog) -> 1)
``````

### lookup

`````` /**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
def lookup(key: K): Seq[V] = self.withScope {
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
}
}
``````

``````scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:26

scala> b.foreach(println)
(3,dog)
(7,panther)
(5,eagle)
(5,tiger)
(4,lion)
(3,cat)

scala> b.lookup(5)
res1: Seq[String] = WrappedArray(tiger, eagle)

scala> b.lookup(3)
res2: Seq[String] = WrappedArray(dog, cat)

scala> b.lookup(4)
res3: Seq[String] = WrappedArray(lion)

scala> b.lookup(7)
res4: Seq[String] = WrappedArray(panther)
``````

### coalesce和repartition

``````/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

//repartition就是coalesce发生shuffle时的情形。

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions (\$numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
``````

``````scala> val data = sc.textFile("file:///usr/home/dw/yzxJar/rdd.txt")
data: org.apache.spark.rdd.RDD[String] = file:///usr/home/dw/yzxJar/rdd.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> data.foreach(println)
hello world
hello java
hello hive
hello hbase
hello spark

scala> data.collect
res0: Array[String] = Array(hello world, hello java, hello hadoop, hello hive, hello hbase, hello spark, hello flink)

//默认分区数是2
scala> data.partitions.size
res1: Int = 2

scala> val rdd1 = data.coalesce(1)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at <console>:26

//分区数是1
scala> rdd1.partitions.size
res2: Int = 1

//如果重分区的数目大于原来的分区数，那么必须指定shuffle参数为true，否则，分区数不便。
scala> val rdd2 = data.coalesce(4)
rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at <console>:26

scala> rdd2.partitions.size
res3: Int = 2

scala> val rdd3 = data.coalesce(4, true)
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at <console>:26

scala> rdd3.partitions.size
res4: Int = 4
``````

org/apache/spark/rdd/RDD.scala

org/apache/spark/rdd/PairRDDFunctions.scala

The RDD API By Example

0
0 收藏

0 评论
0 收藏
0