# 从MaxTemperature程序来看Mapreduce 的执行过程

2014/09/22 16:38

1、Map-Reduce的逻辑过程

0067011990999991950051507+0000+
0043011990999991950051512+0022+
0043011990999991950051518-0011+
0043012650999991949032412+0111+
0043012650999991949032418+0078+
0067011990999991937051507+0001+
0043011990999991937051512-0002+
0043011990999991945051518+0001+
0043012650999991945032412+0002+
0043012650999991945032418+0078+

Map-Reduce主要包括两个步骤：Map和Reduce

map阶段的key-value对的格式是由输入的格式所决定的，如果是默认的TextInputFormat，则每行作为一个记录进程处理，其中key为此行的开头相对于文件的起始位置，value就是此行的字符文本
map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应

(0, 0067011990999991950051507+0000+)
(33, 0043011990999991950051512+0022+)
(66, 0043011990999991950051518-0011+)
(99, 0043012650999991949032412+0111+)
(132, 0043012650999991949032418+0078+)
(165, 0067011990999991937051507+0001+)
(198, 0043011990999991937051512-0002+)
(231, 0043011990999991945051518+0001+)
(264, 0043012650999991945032412+0002+)
(297, 0043012650999991945032418+0078+)

(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(1937, 1)
(1937, -2)
(1945, 1)
(1945, 2)
(1945, 78)

(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78])

(1950, 22)
(1949, 111)
(1937, 1)
(1945, 78)

2、编写Map-Reduce程序

map: (K1, V1)  ->  list(K2, V2)
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
}
reduce: (K2, list(V))  ->  list(K3, V3)
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter)
throws IOException;
}

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(25) == '+') {
airTemperature = Integer.parseInt(line.substring(26, 30));
} else {
airTemperature = Integer.parseInt(line.substring(25, 30));
}
output.collect(new Text(year), new IntWritable(airTemperature));
}
}

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}

Map-Reduce程序，也即上面实现的Mapper和Reducer

partition的规则为：(K2, V2) –> Integer， 也即根据K2，生成一个partition的id，具有相同id的K2则进入同一个partition，被同一个TaskTracker上被同一个Reducer进行处理。
public interface Partitioner<K2, V2> extends JobConfigurable {
int getPartition(K2 key, V2 value, int numPartitions);
}

setInputFormat：设置map的输入格式，默认为TextInputFormat，key为LongWritable, value为Text
setMapperClass：设置Mapper，默认为IdentityMapper
setMapRunnerClass：设置MapRunner, map task是由MapRunner运行的，默认为MapRunnable，其功能为读取input split的一个个record，依次调用Mapper的map函数
setMapOutputKeyClass和setMapOutputValueClass：设置Mapper的输出的key-value对的格式
setOutputKeyClass和setOutputValueClass：设置Reducer的输出的key-value对的格式
setReducerClass：设置Reducer，默认为IdentityReducer
setOutputFormat：设置任务的输出格式，默认为TextOutputFormat
FileOutputFormat.setOutputPath：设置输出文件的路径，在job运行前此路径不应该存在

public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
3、Map-Reduce数据流(data flow)
Map-Reduce的处理过程主要涉及以下四个部分：

JobTracker：协调整个job的运行，其为一个Java进程，其main class为JobTracker

3.1、任务提交
JobClient.runJob()创建一个新的JobClient实例，调用其submitJob()函数。

3.2、任务初始化

3.3、任务分配

3.4、任务执行

3.4.1、Map的过程
MapRunnable从input split中读取一个个的record，然后依次调用Mapper的map函数，将结果输出。
map的输出并不是直接写入硬盘，而是将其写入缓存memory buffer。

3.4.2、Reduce的过程

reducer中一个线程周期性的向JobTracker请求map输出的位置，直到其取得了所有的map输出。

3.5、任务结束

0
0 收藏

0 评论
0 收藏
0