文档章节

一个MapReuce作业的从开始到结束--第5章 把文件复制到HDFS的流程

brian_2017
 brian_2017
发布于 2017/01/17 09:22
字数 1605
阅读 7
收藏 0

  在本章,我们以《从零开始学习Hadoop》的第一个例子WordCount为例,分析Hadoop执行MapReduce的流程和诸多细节。这个例子在该书有详细的说明,在这里不在给出,请参考该书了解细节。

1. 执行流程



[1].README.txt文件复制到HDFS的命令是:

./bin/hadoop fs -putREADME.txt readme.txt

[2].hadoop脚本中可知,fs对应的类是org.apache.hadoop.fs.FsShell,这个类对应的源代码在core/org/apache/hadoop/fs/FsShell.java文件。

[3]. FsShell类的入口是main函数。

[4].main函数,对FsShell进行一次实例化,FsShellshell = new FsShell(),然后用ToolRunnerargv参数传给shell执行。

[5].ToolRunner调用shellrun函数,参数是argv

[6].FsShellrun函数,首先会检查参数是否合理,然后执行init函数,进行初始化。初始化很简单,就是获取文件系统,获取Trash

[7].对于”-put”命令,run函数用一个Path数组保存源文件路径和目的路径,然后调用copyFromLocal函数进行处理。

[8].FsShellcopyFromLocal函数中,获取FileSystem,然后执行FileSystemcopyFromLocalFile函数。

[9].FileSystemcopyFromLocalFile函数有多个重载的版本,几经周转之后,最终调用的是FileUtilcopy函数:

FileUtil.copy(getLocal(conf),srcs, this, dst, delSrc, overwrite, conf);

[10].FileSystemcopy函数也有多个重载的版本,经过多次调用后,最终调用的是这个copy函数:

copy(FileSystem srcFS, Pathsrc, FileSystem dstFS, Path dst, boolean deleteSource,booleanoverwrite, Configuration conf)

README.txt复制到HDFS,对应下面这段代码:

InputStreamin=null;

OutputStream out =null;

try {

in =srcFS.open(src);

out =dstFS.create(dst, overwrite);

IOUtils.copyBytes(in,out, conf, true);

}



fileSystem中,open函数和create函数都有多个重载的版本,几经周转,最终都是调用都是虚函数版本,而虚函数的实现,取决于FileSystem的继承类对虚函数open和虚函数create的实现。

各种文件系统的实现,是在目录core/org/apache/hadoop/fs里,如果想知道具体有哪些文件系统实现了,可以在这个目录用grep查一下:

grep -rin “FSDataOutputStreamcreate(” *

Hadoop实现了很多文件系统,如下:

ChecksumFileSystem

FilterFileSystem

FTPFileSystem

HarFileSystem

InMemoryFileSystem

KFSImpl

IFSImpl

KosmosFileSystem

RawLocalFileSystem

S3FileSystem

NativeS3FileSystem

当然,其实最重要的是在这里HDFS,分布式文件系统,代码目录是hdfs/org/apache/hadoop/hdfs,这里FileSystem的继承类如下:

DistributedFileSystem

HftpFileSystem

WebHdfsFileSystem

[11].IOUtils类在输入流和输出流之间传递数据。IOUtils里的函数都是静态函数,所以它只是一个容器,把各种函数打包成类,IOUtilscopyBytes函数,经过几次周转,最终调用的是函数:

copyBytes(InputStreamin, OutputStream out,

final longlength, final int bufferSize, final boolean close)



在这个函数里,数据读写的核心代码是:

intn = 0;

for(long remaining= length;

remaining >0 && n != -1;

remaining -=n) {

final int toRead= remaining < buf.length? (int)remaining : buf.length;

n = in.read(buf,0, toRead);

if (n > 0) {

out.write(buf,0, n);

}

}



这段代码从输入流读数据到缓存,然后再写入到输出流。

2.输出流是如何获取FileSystem



wordCount例子中,输出流对应的文件名是”readme.txt”。 输出流在FsShell类的copyFromLocal函数获取FileSystem,语句如下:

Path dstPath = newPath(dstf);

FileSystem dstFs =dstPath.getFileSystem(getConf());



dstPath执行的构造函数是Path(StringpathString)。初始化的时候,检查pathString中的“:”和”/”位置,由此得到URIScheme。最终,dstPathuri的值是readme.txt

dstPath执行getFileSystem比较复杂,次序如下:

[1].Path类执行getFileSystem(Configurationconf)函数。

[2].FileSystem类执行get(URIuri, Configurationconf),在这个函数里,因为shceme=null,于是执行另外一个get重载版本。因为输出文件名是”readme.txt”,无法直接得到文件系统。

[3].FileSystem类执行get(Configurationconf) ,这个函数调用getDefaultUri函数,获取的默认的URI,然后再次调用get(URIuri, Configuration conf)函数。

[4].fileSystem类执行getDefaultUri函数得到默认的URI,也就是hdfs://localhost:9000

[5].然后再回到[2],这时候就可以参数uri是默认值,就可以执行到return语句,在return语句执行CACHE.get(uri,conf)

[6].执行Cache类的get函数,函数原型是get(URIuri, Configuration conf)CACHE对象属于Cache类。Cache类是FileSystem的内部类。如果uri对应的FileSystem对象已经在缓存里,返回它,如果不在,调用FileSystemcreateFileSystem(uri,conf)函数创建它。

[7].执行FileSystemcreateFileSystem函数,函数原型是:

FileSystemcreateFileSystem(URI uri, Configuration conf)

在这里函数里,从配置中读取hfds对应的实现类:

Class<?> clazz =conf.getClass("fs." + uri.getScheme() + ".impl",null)

"fs." +uri.getScheme() + ".impl" 就是”fs.hdfs.impl”

于是,clazz值就是org.apache.hadoop.hdfs.DistributedFileSystem。然后,通过反射的方式,生成一个clazz对象,并将类型转换成FileSystem

FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz, conf)

然后返回fs,这就是输出流的文件系统。

3.输入流如何获取FileSystem



输入流获取FileSystem的地方是在FileSytem类调用copyFromLocalFile函数的时候。

[1].FileSystem调用copyFromLocalFile函数,copyFromLocalFile函数调用getLocal函数。

[2].FileSystemgetLocal函数,调用get(LocalFileSystem.NAME,conf)函数,这时候,LocalFileSystem.NAME的默认值是uri=”file:///

[3].fileSystemget函数,执行的步骤跟输出流获取文件系统的方式差不多,最终会调用createFileSytem函数,此时,从配置里读“fs.file.impl”对应的值,也就是clazz的值,这个值是clazz= class org.apache.hadoop.fs.LocalFileSystem,然后,同样通过反射的方式,创建一个LocalFileSystem对象,然后转化成FileSystem类型并返回它。

4. 创建输入流和输出流



文件复制,需要创建输入流和输出流,代码如下:

in= srcFS.open(src);

out = dstFS.create(dst,overwrite);

srcFsLocalFileSystem,它没有实现open函数,所以它调用的是父类ChecksumFileSystemopen函数。

dstFSDistributedFileSystem,它实现了create函数。但其实这个一个很有趣的过程,dstFs先调用的是FileSystemcreate函数,也就是两个参数的版本。但实际上,这个函数要在FileSystem中不断调用其他的各种参数的create函数,最终调用的虚函数:

public abstractFSDataOutputStream create(Path f,

FsPermission permission,

boolean overwrite,

int bufferSize,

short replication,

long blockSize,

Progressable progress)throws IOException;

DistributedFileSystem实现的就是这个7个参数的create函数。函数create调用环节非常之多,如果你去看源代码的话,能体会到Hadoop在版本更替中所做的种种努力调和各种问题。

DistributedFileSystemChecksumFileSystem的输入流和输出流的实现非常精彩,非常值得一读,非常有助于提升功力,它们是Hadoop的精华所在!

© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
Hadoop各模块的具体分析

Hadoop集群架构 Hadoop集群由一个Master主节点和若干个Slave节点组成。其中,Master节点上运行NameNode和JobTracker守护进程;Slave节点上运行DataNode和TaskTracker守护进程。 Hadoop分别从...

牧师-Panda
2016/11/06
1K
0
《Hadoop权威指南》读书简记

第一章:就是介绍一下Hadoop的历史及发展过程。 第二章:MapReduce 从一个统计气象学的例子,来引出MapReduce的写法,对比了一下新旧API的区别以及不同。新的API主要采用的是虚类而不是接口的...

牧师-Panda
2016/11/27
30
0
Spark的RDD检查点实现分析

概述在《深入理解Spark:核心思想与源码分析》一书中只是简单介绍了下RDD的checkpoint,对本书是个遗憾。所以此文的目的旨在查漏补缺,完善本书的内容。Spark的RDD执行完成之后会保存检查点,...

beliefer
2016/05/26
0
0
[Hadoop]MapReducer工作过程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/53939546 1. 从输入到输出 一个MapReducer作业经过了input,map,combine,reduc...

sjf0115
2016/12/30
0
0
MapReduce实现原理详解

MR简介 一个MR作业通常会把输入的数据集切分为若干独立的数据块,先由Map任务并行处理,然后MR框架对Map的输出先进行排序,然后把结果作为Reduce任务的输入。MR框架是一种主从框架,由一个单...

edwardGe
2018/07/03
382
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
53分钟前
4
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
今天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
今天
4
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
今天
6
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部