一个MapReuce作业的从开始到结束--第5章 把文件复制到HDFS的流程

原创
2017/01/17 09:22
阅读数 24

  在本章,我们以《从零开始学习Hadoop》的第一个例子WordCount为例,分析Hadoop执行MapReduce的流程和诸多细节。这个例子在该书有详细的说明,在这里不在给出,请参考该书了解细节。

1. 执行流程



[1].README.txt文件复制到HDFS的命令是:

./bin/hadoop fs -putREADME.txt readme.txt

[2].hadoop脚本中可知,fs对应的类是org.apache.hadoop.fs.FsShell,这个类对应的源代码在core/org/apache/hadoop/fs/FsShell.java文件。

[3]. FsShell类的入口是main函数。

[4].main函数,对FsShell进行一次实例化,FsShellshell = new FsShell(),然后用ToolRunnerargv参数传给shell执行。

[5].ToolRunner调用shellrun函数,参数是argv

[6].FsShellrun函数,首先会检查参数是否合理,然后执行init函数,进行初始化。初始化很简单,就是获取文件系统,获取Trash

[7].对于”-put”命令,run函数用一个Path数组保存源文件路径和目的路径,然后调用copyFromLocal函数进行处理。

[8].FsShellcopyFromLocal函数中,获取FileSystem,然后执行FileSystemcopyFromLocalFile函数。

[9].FileSystemcopyFromLocalFile函数有多个重载的版本,几经周转之后,最终调用的是FileUtilcopy函数:

FileUtil.copy(getLocal(conf),srcs, this, dst, delSrc, overwrite, conf);

[10].FileSystemcopy函数也有多个重载的版本,经过多次调用后,最终调用的是这个copy函数:

copy(FileSystem srcFS, Pathsrc, FileSystem dstFS, Path dst, boolean deleteSource,booleanoverwrite, Configuration conf)

README.txt复制到HDFS,对应下面这段代码:

InputStreamin=null;

OutputStream out =null;

try {

in =srcFS.open(src);

out =dstFS.create(dst, overwrite);

IOUtils.copyBytes(in,out, conf, true);

}



fileSystem中,open函数和create函数都有多个重载的版本,几经周转,最终都是调用都是虚函数版本,而虚函数的实现,取决于FileSystem的继承类对虚函数open和虚函数create的实现。

各种文件系统的实现,是在目录core/org/apache/hadoop/fs里,如果想知道具体有哪些文件系统实现了,可以在这个目录用grep查一下:

grep -rin “FSDataOutputStreamcreate(” *

Hadoop实现了很多文件系统,如下:

ChecksumFileSystem

FilterFileSystem

FTPFileSystem

HarFileSystem

InMemoryFileSystem

KFSImpl

IFSImpl

KosmosFileSystem

RawLocalFileSystem

S3FileSystem

NativeS3FileSystem

当然,其实最重要的是在这里HDFS,分布式文件系统,代码目录是hdfs/org/apache/hadoop/hdfs,这里FileSystem的继承类如下:

DistributedFileSystem

HftpFileSystem

WebHdfsFileSystem

[11].IOUtils类在输入流和输出流之间传递数据。IOUtils里的函数都是静态函数,所以它只是一个容器,把各种函数打包成类,IOUtilscopyBytes函数,经过几次周转,最终调用的是函数:

copyBytes(InputStreamin, OutputStream out,

final longlength, final int bufferSize, final boolean close)



在这个函数里,数据读写的核心代码是:

intn = 0;

for(long remaining= length;

remaining >0 && n != -1;

remaining -=n) {

final int toRead= remaining < buf.length? (int)remaining : buf.length;

n = in.read(buf,0, toRead);

if (n > 0) {

out.write(buf,0, n);

}

}



这段代码从输入流读数据到缓存,然后再写入到输出流。

2.输出流是如何获取FileSystem



wordCount例子中,输出流对应的文件名是”readme.txt”。 输出流在FsShell类的copyFromLocal函数获取FileSystem,语句如下:

Path dstPath = newPath(dstf);

FileSystem dstFs =dstPath.getFileSystem(getConf());



dstPath执行的构造函数是Path(StringpathString)。初始化的时候,检查pathString中的“:”和”/”位置,由此得到URIScheme。最终,dstPathuri的值是readme.txt

dstPath执行getFileSystem比较复杂,次序如下:

[1].Path类执行getFileSystem(Configurationconf)函数。

[2].FileSystem类执行get(URIuri, Configurationconf),在这个函数里,因为shceme=null,于是执行另外一个get重载版本。因为输出文件名是”readme.txt”,无法直接得到文件系统。

[3].FileSystem类执行get(Configurationconf) ,这个函数调用getDefaultUri函数,获取的默认的URI,然后再次调用get(URIuri, Configuration conf)函数。

[4].fileSystem类执行getDefaultUri函数得到默认的URI,也就是hdfs://localhost:9000

[5].然后再回到[2],这时候就可以参数uri是默认值,就可以执行到return语句,在return语句执行CACHE.get(uri,conf)

[6].执行Cache类的get函数,函数原型是get(URIuri, Configuration conf)CACHE对象属于Cache类。Cache类是FileSystem的内部类。如果uri对应的FileSystem对象已经在缓存里,返回它,如果不在,调用FileSystemcreateFileSystem(uri,conf)函数创建它。

[7].执行FileSystemcreateFileSystem函数,函数原型是:

FileSystemcreateFileSystem(URI uri, Configuration conf)

在这里函数里,从配置中读取hfds对应的实现类:

Class<?> clazz =conf.getClass("fs." + uri.getScheme() + ".impl",null)

"fs." +uri.getScheme() + ".impl" 就是”fs.hdfs.impl”

于是,clazz值就是org.apache.hadoop.hdfs.DistributedFileSystem。然后,通过反射的方式,生成一个clazz对象,并将类型转换成FileSystem

FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz, conf)

然后返回fs,这就是输出流的文件系统。

3.输入流如何获取FileSystem



输入流获取FileSystem的地方是在FileSytem类调用copyFromLocalFile函数的时候。

[1].FileSystem调用copyFromLocalFile函数,copyFromLocalFile函数调用getLocal函数。

[2].FileSystemgetLocal函数,调用get(LocalFileSystem.NAME,conf)函数,这时候,LocalFileSystem.NAME的默认值是uri=”file:///

[3].fileSystemget函数,执行的步骤跟输出流获取文件系统的方式差不多,最终会调用createFileSytem函数,此时,从配置里读“fs.file.impl”对应的值,也就是clazz的值,这个值是clazz= class org.apache.hadoop.fs.LocalFileSystem,然后,同样通过反射的方式,创建一个LocalFileSystem对象,然后转化成FileSystem类型并返回它。

4. 创建输入流和输出流



文件复制,需要创建输入流和输出流,代码如下:

in= srcFS.open(src);

out = dstFS.create(dst,overwrite);

srcFsLocalFileSystem,它没有实现open函数,所以它调用的是父类ChecksumFileSystemopen函数。

dstFSDistributedFileSystem,它实现了create函数。但其实这个一个很有趣的过程,dstFs先调用的是FileSystemcreate函数,也就是两个参数的版本。但实际上,这个函数要在FileSystem中不断调用其他的各种参数的create函数,最终调用的虚函数:

public abstractFSDataOutputStream create(Path f,

FsPermission permission,

boolean overwrite,

int bufferSize,

short replication,

long blockSize,

Progressable progress)throws IOException;

DistributedFileSystem实现的就是这个7个参数的create函数。函数create调用环节非常之多,如果你去看源代码的话,能体会到Hadoop在版本更替中所做的种种努力调和各种问题。

DistributedFileSystemChecksumFileSystem的输入流和输出流的实现非常精彩,非常值得一读,非常有助于提升功力,它们是Hadoop的精华所在!

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部