文档章节

一个MapReuce作业的从开始到结束--第7章 MapReduce的执行过程

brian_2017
 brian_2017
发布于 2017/01/17 09:23
字数 2117
阅读 16
收藏 0

1. Job

在新版的API中,MapReduce的执行有Job类管理。Job类的代码在mapred/org/apache/hadoop/mapreduce/Job.java

wordCount的例子中,main函数执行一下的步骤:

Job job = new Job(conf,"word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)? 0 : 1);

Job类继承了JobContext类。Job类和JobContext类的大部分函数都是在设置conf,也就是配置类,比如说,上面语句中凡是set函数都是对conf进行操作。而下两行代码:

FileInputFormat.addInputPath(job,new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

其实也是设置conf,看到源代码就知道了。



JobwaitForCompletion函数负责执行MapReduce作业,它的执行流程如下:

[1].调用Job类的submit函数,向JobTracker提交作业,提交后,会返回一个RunningJob类的实例info

[2].调用JobClient类的实例jobClientmonitorAndPrintJob函数,根据info监控和打印MapReduce作业的执行中的各种信息。

[3].返回MapReduce作业执行是否成功。


Job类的submit函数提交作业的流程如下:

[1].调用Job类的connect函数,连接到JobTracker节点。

[2].调用JobClient类的实例jobClientsubmitJobInternal函数,提交MapReduce作业。


2.Job类的connect函数的执行

它只执行一条语句,如下:

ugi.doAs(newPrivilegedExceptionAction<Object>() {

public Object run()throws IOException {

jobClient = newJobClient((JobConf) getConfiguration());

return null;

}

});


ugi是类UserGroupInformation的实例,它在Job类的父类JobContext中定义和实例化的,也就是:

ugi =UserGroupInformation.getCurrentUser(),这里涉及到Hadoop的安全机制,暂且不用管。

ugi执行doAs函数的时候,执行Subject.doAs(subject,action),也就是运行run函数,运行run函数时,jobClient会进行初始化,然后连接JobTrackerJobClient初始化的时候,会执行init函数,创建RPC远程过程调用rpcJobSubmitClient,然后再创建它的代理类jobSubmitClient

3.JobClient类的submitJobInternal函数的执行

JobClientsubmitJobInternal函数,向JobTracker节点提交MapReduce作业。在这个函数里,提交作业的关键语句是:

status =jobSubmitClient.submitJob(jobId, submitJobDir.toString(),jobCopy.getCredentials())


jobIdMapReduceID号,它是从JobTracker节点获取的,语句如下:

JobID jobId =jobSubmitClient.getNewJobId()

submitJobDirJobTracker所在的文件系统的一个目录,如果Hadoop运行在集群状态下,submitJobDir就是HDFS上的一个目录,这个目录对应提交的作业。



getCredentials()Credentials类提供读写高安全性的键值函数。


jobSubmitClient是在init函数里创建的远程过程调用RPC的代理类,调用它的submitJob函数,就是远程调用JobTracker节点的submitJob函数。


提交作业的本质就是,从JobTracker上获取jobId,然后在文件系统上创建一个特定的目录,把所有跟任务相关的数据都写到这个目录,然后再通知JobTracker去执行作业。


如果Hadoop运行在伪分布模式,那么JobTrackerlocalhost,也就本地主机,这时候,Hadoop调用LocalRunner执行MapReduce作业。


如果如果Hadoop运行在集群模式,那么JobTracker是远程主机,Hadoop调用JobTracker节点以集群的方式执行MapReduce作业。



4.JobTracker节点执行MapReduce作业

JobTrackersubmit函数被调用后,会先对作业进行多个步骤的处理,如检查参数,创建job等,然后,将job加入到执行队列中,关键语句是:

status = addJob(jobId, job)

将让listener也添加作业:

for (JobInProgressListenerlistener : jobInProgressListeners) {

listener.jobAdded(job);

}

jobs记录了提交的作业,它的定义如下:

Map<JobID, JobInProgress>jobs = Collections.synchronizedMap(new TreeMap<JobID,JobInProgress>())

JobID是提交的作业的标示符,用它区分作业。JobInProgress是作业的相关信息。



对作业各种操作,都是通过jobs来的,比如要杀掉一个作业,是根据JobIDjobs里找到它对应的JobInProgress,然后kill掉。如果要获取一个作业的任务执行情况,是根据JobIDjobs里找到它对应的JobInProgress,然后根据JobInProgress得到TaskInProgress

4.1JobTracker节点的运行

JobTracker节点从main函数开始运行,main函数先调用startTracker函数,然后再调用offerService函数。



offserService函数,创建RPC服务端interTrackerServer,然后启动taskScheduler,启动expireTrackersThread,启动retireJobsThread,调用interTrackerServerjoin()函数,进入服务状态。



JobTracker类的内部类RetireJobs,清理已经完成的旧的作业,它实现了Runnable接口,在一个线程里执行,它在执行的时候,从jobs里取出所有的作业,然后进行判断和清理。Java里实现多线程有两种方式,一种是继承Thread类,重写run函数,另一种是实现Runnable接口,实现run函数,然后把这个类在一个线程里执行。有一片文档写的很好,把ThreadRunnable解释的很清楚,http://developer.51cto.com/art/201203/321042.htm



JobTracker类的内部类RecoverManager,重启之后,恢复已经执行和正在执行的作业。GetSetupAndCleanupTasks获取清理掉的任务和已设置的任务。



JobTrackeerinitJob函数,初始化作业。job执行JobInProcessinitTasks函数,在initTasks函数,作业会被拆成若干个Map任务,若干个Reduce任务。JobTrackerheartbeat函数,这个函数由TaskTracker调用,而且是以RPC的方式调用。有很多很多的的操作,在心跳里发给TaskTracker

4.2JobTracker节点处理作业

TaskNode节点启动之后,会循环调用JobTracker节点的心跳函数,也就是heatbeat函数。

heartbeat函数被调用后,会返回一个可序列化的HeartbeatResponse对象,步骤如下:

[1].创建一个新的HeartbeatResponse对象response

[2].通常情况下,获取要发给TaskTracker执行的任务:getSetupAndCleanupTasks,在这个函数里,是从jobs中取得job,然后按照一定的次序从job中获取要执行的Task

[3].把任务存放到LanchTaskAction中,然后存储到actions里。然后再给actions添加其他各种任务,如清理需要Kill的任务,需要存储结果的任务,计算下一次心跳间隔等等。

[4].返回responseTaskTracker节点。

5.TaskTracker节点执行MapReduce作业

TaskTracker类的代码在文件mapred/org/apache/hadoop/mapred/TaskTracker.java里。

5.1TaskTracker节点的运行

TaskTracker节点启动时,步骤如下:

[1]. 运行main函数。

[2].main函数,创建TaskTracker对象,然后执行它的run函数,进入无限循环。

[3]. run函数,执行offerService函数。

[4].offerService函数,执行一个无限循环,在循环里,定期向JobTracker发送心跳,也就是执行函数:

HeartbeatResponseheartbeatResponse = transmitHeartBeat(now);

然后,从 heartbeatResponse取出要执行的任务:

TaskTrackerAction[]actions = heartbeatResponse.getActions();

通常情况下,会将actions加入到任务队列:

addToTaskQueue((LaunchTaskAction)action);

实际上,如果是Map任务,就加入到mapLauncher,如果是Reduce任务,加入到reduceLauncher

5.2TaskTracker处理作业

mapLauncherreduceLauncher是线程类TaskLauncher的实例,当TaskTracker在运行的时候,它们负责取出Map任务和Reduce任务执行。

taskLaucher线程类的run函数可知,处理的流程是这样的:

tip =tasksToLaunch.remove(0);

startNewTask(tip);



startNewTask函数,执行任务:

RunningJob rjob =localizeJob(tip);

launchTaskForJob(tip,new JobConf(rjob.getJobConf()), rjob)

然后,由TaskInProgress类的实例tip执行launchTask(rjob),然后执行:

localizeTask(task);

setTaskRunner(task.createRunner(TaskTracker.this,this, rjob));

this.runner.start();



localizeJob函数是把作业进行本地化,localizeTask函数是进行任务本地化,本地化是让TaskTracker节点创建作业和任务运行的本地环境,包括创建目录,下载Jar文件,设置环境变量等等。然后,创建TraskRuuner,然后由TraskRunner启动任务。Map任务和Reduce任务有各自的TaskRunner

TaskRunner是线程类,从它的run函数可以知道任务执行是,TaskRunner创建环境变量,参数,启动命令,输出流等等,然后由JvmManager对象启动一个JVM执行任务:

launchJvmAndWait(setupCmds,vargs, stdout, stderr, logSize, workDir)

也就是在JvmManager里,执行launchJvm函数,然后,执行:

setRunningTaskForJvm(jvmRunner.jvmId,t)

然后,在JvmManager里执行内部类JvmRunnerrunChild函数。

然后,在JvmManager里调用DefaultTaskControllerlaunchTask函数执行,主要是两行代码:

shExec = newShellCommandExecutor(new String[]{

"bash",commandFile},

currentWorkDirectory);

shExec.execute();

再这里,将要执行的命令写入bash脚本,然后在执行。

在脚本,执行的是org.apache.hadoop.mapred.Child类。

然后,假如是Map任务的话,会执行到MapTaskrun函数,再执行runNewMapper函数,终于终于,在这里开始执行Map任务了,这个Map任务就是我们在jar包里的Mapper类,于是,经过各种实例化之后,读数据等等,执行语句就是:

mapperContext =contextConstructor.newInstance(

mapper,job, getTaskID(),

input,output, committer,

reporter,split);


input.initialize(split,mapperContext);

mapper.run(mapperContext);

input.close();

output.close(mapperContext);





© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
【电子书】Hadoop实战手册 (样章第一章)

Hadoop实战手册 [美] Jonathan R. Owens,Jon Lentz,Brian Femiano 著; 傅杰,赵磊,卢学裕 译 内容简介   这是一本Hadoop实用手册,主要针对实际问题给出相应的解决方案。《Hadoop实战手...

dwf07223
2018/06/28
0
0
第6章-MapReduce的工作机制-笔记

作业的提交 可以只用一行代码来运行一个MapReduce作业: JobClient.runJob(conf)。 作业的调度 Hadoop作业调度演进 1、早期版本的Hadoop使用FIFO调度算法来运行作业 早期版本的Hadoop使用一种...

hiqj
2014/10/16
36
0
架构精讲: Hadoop技术框架和架构演进方向

Apache Hadoop是一个开源软件框架,可安装在一个商用机器集群中,使机器可彼此通信并协同工作,以高度分布式的方式共同存储和处理大量数据。最初,Hadoop 包含以下两个主要组件: Hadoop Dist...

btb5e6nsu1g511eg5xeg
2018/05/15
0
0
深入浅出Hadoop数据流(1)

首先是一些术语的说明。MapReduce作业(job)是客户端执行的单位:它包括输入数据、MapReduce程序和配置信息。Hadoop通过把作业分成若干个小任务(task)来工作,其包括两种类型的任务:map任务和...

超人学院
2015/03/30
158
0
Hadoop中的MapReduce(5)

在MapReduce中,它也是主从结构,主节点:JobTracker,从节点:TaskTracker。主节点只有一个从节点有很多个,主节点在主机上,从节点分布到其他机器上。 JobTracker: 作用: 1、负责接收用户...

肖鋭
2014/02/23
107
0

没有更多内容

加载失败,请刷新页面

加载更多

SpringBoot中 集成 redisTemplate 对 Redis 的操作(二)

SpringBoot中 集成 redisTemplate 对 Redis 的操作(二) List 类型的操作 1、 向列表左侧添加数据 Long leftPush = redisTemplate.opsForList().leftPush("name", name); 2、 向列表右......

TcWong
今天
4
0
排序––快速排序(二)

根据排序––快速排序(一)的描述,现准备写一个快速排序的主体框架: 1、首先需要设置一个枢轴元素即setPivot(int i); 2、然后需要与枢轴元素进行比较即int comparePivot(int j); 3、最后...

FAT_mt
昨天
4
0
mysql概览

学习知识,首先要有一个总体的认识。以下为mysql概览 1-架构图 2-Detail csdn |简书 | 头条 | SegmentFault 思否 | 掘金 | 开源中国 |

程序员深夜写bug
昨天
10
0
golang微服务框架go-micro 入门笔记2.2 micro工具之微应用利器micro web

micro web micro 功能非常强大,本文将详细阐述micro web 命令行的功能 阅读本文前你可能需要进行如下知识储备 golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境, golang微服务框架...

非正式解决方案
昨天
8
0
前端——使用base64编码在页面嵌入图片

因为页面中插入一个图片都要写明图片的路径——相对路径或者绝对路径。而除了具体的网站图片的图片地址,如果是在自己电脑文件夹里的图片,当我们的HTML文件在别人电脑上打开的时候图片则由于...

被毒打的程序猿
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部