MapReduce的5个阶段:
input
map
shuffle
reduce
output
shuffle过程实现的功能:
分区:Partition
做什么:决定当前的Key交给哪个reduce进行处理。
相同的key,应该有同一个reduce进行处理。
默认情况:根据key的hash值,对reduce个数取余。
以下是Apache Hadoop MapReduce Core 2.10.0 的源码:可以看到那个取余操作。(取余之前的key的hashcode与最大整型数做按位与运算是为了转为正数)
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.JobConf;
/**
* Partition keys by their {@link Object#hashCode()}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
分组:
将相同的key的value进行合并。
key相等的话,将分到同一个组里面。例如,在word count中,key是单词,value合并之后就是1,1,1,1,1.....
小结:在MapReduce阶段,一行调用一次map方法;一种key调用一次reduce方法。
排序:
按照key的指导顺序进行排序。
详细的过程:
map端的shuffle:
1)spill:溢写
每个map task都有一个内存缓冲区,存储着map的输出结果,这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。
当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并(因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge),生成最终的正式输出文件,然后等待reduce task来拉数据。
a) 每一个map处理之后的结果将会进入环形缓冲区(内存:100M)
这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
b) 分区:
对每一条key-value进行分区(打reduce标签)
hadoop | 1 | reduce0 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hadoop | 1 | reduce0 | |
c) 排序
partition是和sort一起做的,负责Spill的线程在拿到一段内存buf后会调用QuickSort的sort方法进行内存中的快排。
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* An implementation of the core algorithm of QuickSort.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class QuickSort implements IndexedSorter {
private static final IndexedSorter alt = new HeapSort();
public QuickSort() { }
private static void fix(IndexedSortable s, int p, int r) {
if (s.compare(p, r) > 0) {
s.swap(p, r);
}
}
/**
* Deepest recursion before giving up and doing a heapsort.
* Returns 2 * ceil(log(n)).
*/
protected static int getMaxDepth(int x) {
if (x <= 0)
throw new IllegalArgumentException("Undefined for " + x);
return (32 - Integer.numberOfLeadingZeros(x - 1)) << 2;
}
/**
* Sort the given range of items using quick sort.
* {@inheritDoc} If the recursion depth falls below {@link #getMaxDepth},
* then switch to {@link HeapSort}.
*/
@Override
public void sort(IndexedSortable s, int p, int r) {
sort(s, p, r, null);
}
@Override
public void sort(final IndexedSortable s, int p, int r,
final Progressable rep) {
sortInternal(s, p, r, rep, getMaxDepth(r - p));
}
private static void sortInternal(final IndexedSortable s, int p, int r,
final Progressable rep, int depth) {
if (null != rep) {
rep.progress();
}
while (true) {
if (r-p < 13) {
for (int i = p; i < r; ++i) {
for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {
s.swap(j, j-1);
}
}
return;
}
if (--depth < 0) {
// give up
alt.sort(s, p, r, rep);
return;
}
// select, move pivot into first position
fix(s, (p+r) >>> 1, p);
fix(s, (p+r) >>> 1, r - 1);
fix(s, p, r-1);
// Divide
int i = p;
int j = r;
int ll = p;
int rr = r;
int cr;
while(true) {
while (++i < j) {
if ((cr = s.compare(i, p)) > 0) break;
if (0 == cr && ++ll != i) {
s.swap(ll, i);
}
}
while (--j > i) {
if ((cr = s.compare(p, j)) > 0) break;
if (0 == cr && --rr != j) {
s.swap(rr, j);
}
}
if (i < j) s.swap(i, j);
else break;
}
j = i;
// swap pivot- and all eq values- into position
while (ll >= p) {
s.swap(ll--, --i);
}
while (rr < r) {
s.swap(rr++, j++);
}
// Conquer
// Recurse on smaller interval first to keep stack shallow
assert i != j;
if (i - p < r - j) {
sortInternal(s, p, i, rep, depth);
p = j;
} else {
sortInternal(s, j, r, rep, depth);
r = i;
}
}
}
}
mapper输出的keyvalue首先是按partition聚合。
而我们如果指定key的compare方法会在这里生效并进行排序。
如果job没有定义combiner则直接写文件,如果有combiner则在这里进行combine。
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
d) 当环形缓冲区大道阈值80%,开始溢写。
将分区排序后的数据溢写到磁盘,变成文件file1。
最终会产生很多小文件。
2)merge:合并:
在生成spill文件后还会将此次spillRecord的记录写在一个index文件中。
当所有任务完成,就进入merge阶段。
每个spill生成的文件中key value都是有序的,但不同的溢写文件之间却是乱序的,类似多个有序文件的多路归并算法。
Merger分别取出需要merge的spillfile的最小的keyvalue,放入一个内存堆中,每次从堆中取出一个最小的值,并把此值保存到merge的输出文件中。
这里和hbase中scan的算法非常相似。
这里merge时不同的partition的key是不会比较的,只有相同的partition的keyvalue才会进行排序和合并。
a) 将spill产生的小文件进行合并
file1:
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
file2:
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
b) 将相同分区的数据进行分区内排序
endFile:
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
3) map task 结束,通知appmaster;appmaster通过reduce过来拉取数据。
在merge完后会把不同partition的信息保存进一个index文件以便之后reducer来拉自己部分的数据。
io.sort.mb的大小最好为 0.25 x mapred.child.java.opts(jvm可用内存) 可用内存 至 0.5 x mapred.child.java.opts之间。
在keyvalue对写入MapOutputBuffer时会调用partitioner.getPartition方法计算partition即应该分配到哪个reducer,这里的partition只是在内存的buf的index区写入一条记录。
reduce端的shuffle
map-task1
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
map-task2
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hadoop | 1 | reduce0 | |
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
1)reduce骑电动多个线程通过网络到每台机器上拉取属于自己分区的数据。
reduce1
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 |
2)merge合并:将每个Map task的结果中属于自己分区的数据进行合并。
a)排序:将整齐属于我分区的数据进行排序
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hbase | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
hive | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
spark | 1 | reduce1 | |
3)分组:对相同的key的value进行合并。
hbase | list<1,1,1,1> | ||||
hive | list<1,1,1,1> | ||||
spark | list<1,1,1,1> | ||||