文档章节

MapReduce V1:Job提交流程之JobClient端分析

尼古丁诱惑的男人
 尼古丁诱惑的男人
发布于 2015/10/09 10:51
字数 1990
阅读 547
收藏 1

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。下图是《Hadoop权威指南》一书给出的MapReduce V1处理Job的抽象流程图:

如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程。
在编写好MapReduce程序以后,需要将Job提交给JobTracker,那么我们就需要了解在提交Job的过程中,在JobClient端都做了哪些工作,或者说执行了哪些处理。在JobClient端提交Job的处理流程,如下图所示:

上图所描述的Job的提交流程,说明如下所示:

 

  1. 在MR程序中创建一个Job实例,设置Job状态

  2. 创建一个JobClient实例,准备将创建的Job实例提交到JobTracker

  3. 在创建JobClient的过程中,首先必须保证建立到JobTracker的RPC连接

  4. 基于JobSubmissionProtocol协议远程调用JobTracker获取一个新的Job ID

  5. 根据MR程序中配置的Job,在HDFS上创建Job相关目录,并将配置的tmpfiles、tmpjars、tmparchives,以及Job对应jar文件等资源复制到HDFS

  6. 根据Job配置的InputFormat,计算该Job输入的Split信息和元数据(SplitMetaInfo)信息,以及计算出map和reduce的个数,最后将这些信息连通Job配置写入到HDFS(保证JobTracker能够读取)

  7. 通过JobClient基于JobSubmissionProtocol协议方法submitJob提交Job到JobTracker

MR程序创建Job

下面的MR程序示例代码,已经很熟悉了:

01 public static void main(String[] args) throws Exception {
02   Configuration conf = new Configuration();
03   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
04   if (otherArgs.length != 2) {
05     System.err.println("Usage: wordcount <in> <out>");
06     System.exit(2);
07   }
08   Job job = new Job(conf, "word count");
09   job.setJarByClass(WordCount.class);
10   job.setMapperClass(TokenizerMapper.class);
11   job.setCombinerClass(IntSumReducer.class);
12   job.setReducerClass(IntSumReducer.class);
13   job.setOutputKeyClass(Text.class);
14   job.setOutputValueClass(IntWritable.class);
15   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
16   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
17   System.exit(job.waitForCompletion(true) ? 0 1);
18 }

在MR程序中,首先创建一个Job,并进行配置,然后通过调用Job的waitForCompletion方法将Job提交到MapReduce集群。这个过程中,Job存在两种状态:Job.JobState.DEFINE和Job.JobState.RUNNING,创建一个Job后,该Job的状态为Job.JobState.DEFINE,Job内部通过JobClient基于org.apache.hadoop.mapred.JobSubmissionProtocol协议提交给JobTracker,然后该Job的状态变为Job.JobState.RUNNING。

Job提交目录submitJobDir

通过如下代码可以看到,Job提交目录是如何创建的:

1 JobConf jobCopy = job;
2 Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); // 获取到StagingArea目录
3 JobID jobId = jobSubmitClient.getNewJobId();
4 Path submitJobDir = new Path(jobStagingArea, jobId.toString());

获取StagingArea目录,JobClient需要通过JobSubmissionProtocol协议的远程方法getStagingAreaDir从JobTracker端获取到,我们看一下JobTracker端的getStagingAreaDirInternal方法,如下所示:

1 private String getStagingAreaDirInternal(String user) throws IOException {
2   final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir","/tmp/hadoop/mapred/staging"));
3   final FileSystem fs = stagingRootDir.getFileSystem(conf);
4   return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
5 }

最终获取到的StagingArea目录为${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/,例如,如果使用默认的mapreduce.jobtracker.staging.root.dir值,用户为shirdrn,则StagingArea目录/tmp/hadoop/mapred/staging/shirdrn/.staging/。通过Path submitJobDir = new Path(jobStagingArea, jobId.toString());可以得到submitJobDir,假如一个job的ID为job_200912121733_0002,则submitJobDir的值为/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/

拷贝资源文件

在配置Job的时候,可以指定tmpfiles、tmpjars、tmparchives,JobClient会将对应的资源文件拷贝到指定的目录中,对应目录如下代码所示:

1     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
2     Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
3     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
4 ...
5     Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
6     job.setJar(submitJarFile.toString());
7     fs.copyFromLocalFile(originalJarFile, submitJarFile);

上面已经知道Job提交目录,可以分别得到对应的资源所在目录:

  • tmpfiles目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files

  • tmpjars目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars

  • tmparchives目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives

  • Job Jar文件:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar

然后,就可以将对应的资源文件拷贝到对应的目录中。

计算并存储Split数据

根据Job配置中设置的InputFormat,计算该Job的数据数据文件是如何进行分片的,代码如下所示:

1 Configuration conf = job.getConfiguration();
2 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
3 List<InputSplit> splits = input.getSplits(job);

实际上就是调用InputFormat的getSplits方法,如果不适用Hadoop自带的FileInputFormat的默认getSplits方法实现,可以自定义实现,重写该默认实现逻辑来定义数据数据文件分片的规则。
计算出输入文件的分片信息,然后需要将这些分片数据写入到HDFS供JobTracker查询初始化MapTask,写入分片数据的实现代码:

1 T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
2 // sort the splits into order based on size, so that the biggest
3 // go first
4 Arrays.sort(array, new SplitComparator()); // 根据InputSplit的长度做了一个逆序排序
5 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);// 将split及其元数据信息写入HDFS

接着调用JobSplitWriter.createSplitFiles方法存储Split信息,并创建元数据信息,并保存元数据信息。存储Split信息,代码实现如下所示:

01 SerializationFactory factory = new SerializationFactory(conf);
02 int i = 0;
03 long offset = out.getPos();
04 for(T split: array) {
05   long prevCount = out.getPos();
06   Text.writeString(out, split.getClass().getName());
07   Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
08   serializer.open(out);
09   serializer.serialize(split); // 将split序列化写入到HDFS文件中
10   long currCount = out.getPos();
11   String[] locations = split.getLocations();
12   final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
13   if (locations.length > max_loc) {
14     LOG.warn("Max block location exceeded for split: "+ split + " splitsize: " + locations.length + " maxsize: " + max_loc);
15     locations = Arrays.copyOf(locations, max_loc);
16   }
17   info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // 创建SplitMetaInfo实例
18   offset += currCount - prevCount;
19 }

我们先看一下FileSplit包含的分片内容,如下所示:

1 private Path file;
2 private long start;
3 private long length;
4 private String[] hosts;

在序列化保存FileSplit到HDFS,可以通过查看FileSplit的write方法,如下所示:

1 @Override
2 public void write(DataOutput out) throws IOException {
3   Text.writeString(out, file.toString());
4   out.writeLong(start);
5   out.writeLong(length);
6 }

需要注意的是,这里面并没有将FileSplit的hosts信息保存,而是存储到了SplitMetaInfo中new JobSplit.SplitMetaInfo(locations, offset, split.getLength())。
下面是保存SplitMetaInfo信息的实现:

01 private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
02     FsPermission p, int splitMetaInfoVersion,
03     JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException {
04   // write the splits meta-info to a file for the job tracker
05   FSDataOutputStream out = FileSystem.create(fs, filename, p);
06   out.write(JobSplit.META_SPLIT_FILE_HEADER); // 写入META头信息:META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
07   WritableUtils.writeVInt(out, splitMetaInfoVersion); // META版本信息:1
08   WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META对象的数量:每个InputSplit对应一个SplitMetaInfo
09   for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
10     splitMetaInfo.write(out); // 每个都进行存储
11   }
12   out.close();
13 }

看一下SplitMetaInfo存储时包含的数据信息:

1 public void write(DataOutput out) throws IOException {
2   WritableUtils.writeVInt(out, locations.length); // location个数
3   for (int i = 0; i < locations.length; i++) {
4     Text.writeString(out, locations[i]); // 写入每一个location位置信息
5   }
6   WritableUtils.writeVLong(out, startOffset); // 偏移量
7   WritableUtils.writeVLong(out, inputDataLength); // 数据长度
8 }

最后,我们看一下这些数据保存的目录和文件情况。前面已经知道Job提交目录,下面看split存储的文件是如何构建的:

1 FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
2 SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

那么split保存的文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split。
同样,split元数据信息文件构建如下所示:

1 writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
2     new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);

split元数据信息文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo。

保存Job配置数据

在提交Job到JobTracker之前,还需要保存Job的配置信息,这些配置数据根据用户在MR程序中配置,覆盖默认的配置值,最后保存到XML文件(job.xml)到HDFS,供JobTracker查询。如下代码,创建submitJobFile文件并写入job配置数据:

01 ...
02         Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
03         jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
04 ...
05         // Write job file to JobTracker's fs      
06         FSDataOutputStream out = FileSystem.create(fs, submitJobFile, newFsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
07 ...
08           try {
09             jobCopy.writeXml(out);
10           finally {
11             out.close();
12           }

前面已经知道Job提交目录,我们很容易就能得到job.xml文件的存储路径:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml。

最后,所有的数据都已经准备完成,JobClient就可以基于JobSubmissionProtocol协议方法submitJob,提交Job到JobTracker运行。

本文转载自:

尼古丁诱惑的男人
粉丝 47
博文 53
码字总数 32844
作品 0
深圳
其他
私信 提问
MapReduce作业提交源码分析

MapReduce作业提交源码分析 我们在编写MapReduce程序的时候,首先需要编写Map函数和Reduce函数。完成mapper和reducer的编写后,进行Job的配置;Job配置完成后,调用Job.submit()方法完成作业...

sdzzboy
2013/09/25
126
0
Hadoop中的MapReduce(5)

在MapReduce中,它也是主从结构,主节点:JobTracker,从节点:TaskTracker。主节点只有一个从节点有很多个,主节点在主机上,从节点分布到其他机器上。 JobTracker: 作用: 1、负责接收用户...

肖鋭
2014/02/23
108
0
Hadoop作业引用第三方jar文件

在eclipse中写mapreduce程序, 引用第三方jar文件, 可以利用eclipse Hadoop插件直接run on hadoop提交, 很方便. 不过插件版本要和eclipse匹配, 不然总是local执行, 在50070是没有job产生的. 如...

Zero零_度
2015/10/29
199
0
hadoop1存在的问题及hadoop2的优势对比

对 hadoop1 和 hadoop 2 做了一个解释 图片不错 拿来看看 Hadoop 1.0 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发...

i_bobby
2015/08/03
77
0
好程序员大数据分享MapReduce中job的提交流程

好程序员大数据分享MapReduce中job的提交流程 一、MapReduce的定义 MapReduce是面向大数据并行处理的计算模型、框架和平台。 它的主要思想是:map(映射)和reduce(归约) 1)MapReduce是一...

好程序员IT
06/05
3
0

没有更多内容

加载失败,请刷新页面

加载更多

Docker下使用disconf:细说demo开发

Docker下的disconf实战全文链接 《Docker搭建disconf环境,三部曲之一:极速搭建disconf》; 《Docker搭建disconf环境,三部曲之二:本地快速构建disconf镜像》; 《Docker搭建disconf环境,...

程序员欣宸
32分钟前
6
0
centos7配置nfs共享存储服务

nfs 是一种网络文件系统,需要依赖rpc进行过程调度 注意nfs只验证id,验证用户名,并且只能在类unix os上进行文件共享服务,由于它的脆弱的验证机制,所以不适宜在internet上工作,在内网使用...

老孟的Linux私房菜
35分钟前
8
0
【F5小常识】F5的 Web 应用防火墙 (WAF)有什么优势?

     现如今传统防火墙已无法满足企业安全需求,网络攻击大多发生在应用层和网络层故障,且呈上升趋势,传统的防火墙存在着很大的不足之处,包括无法检测加密的Web流量、无法扩展深度检测...

梅丽莎好
46分钟前
4
0
整合到 Mockito 2

为了能够持续改进 Mockito 和在未来提升测试体验,我们希望你能够升级到 Mockito 2.10!Mockito 按照语义化版本(semantic versioning)的方式对版本进行编排,并且只在主版本升级的时候包含...

honeymoose
46分钟前
4
0
spring boot actuator

actuator 是监控系统健康的工具,引入 spring-boot-starter-actuator会暴露一些endpoint. 可通过如下配置来配置这些endpoint的基本配置: 可通过http:${url}:28081/management/actuator/*来访...

ZH-JSON
53分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部