文档章节

Hadoop中一些Java Api操作(23)

肖鋭
 肖鋭
发布于 2014/03/12 21:53
字数 3253
阅读 494
收藏 2

Hadoop基本文件系统操作:
    1.首先从本地文件系统将一个文件复制到HDFS:
        hadoop fs-copyFromLocal input/docs/quangle.txthdfs://localhost/user/tom/quangle.txt
        把文件复制回本地文件系统,并检查是否一致:
            hadoop fs -copyToLocal quangle.txt quangle.copy.txt
            md5 input/docs/quangle.txt quangle.copy.txt
            输出:
                MD5(input/docs/quangle.txt)=a16f231da6b05e2ba7a339320e7dacd9
                MD5(quangle.copy.txt)=a16f231da6b05e2ba7a339320e7dacd9
            由于MD5键值相同,表明这个文件在HDFS之旅中得以幸存并保存完整.
    2.从HadoopURL中读取数据:
        InputStreamin=null;
        try{
            in=new URL("hdfs://host/path").openStream();
        }finally{
            IOUtils.closeStream(in);
        }
        通过URLStreamHandler实现以标准输出方式显示Hadoop文件系统的文件
        publicclassURLCat(){
            static{
                URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
            }
        }
        注:从0.21.0版本开始,加入了一个名为FileContext的文件系统接口,该接口能够更好地处理多文件系统问题,并且该接口更简明、一致.
        public static void main(String[] args)throws Exception{
        InputStreamin = null;
            try{
                in = new URL(args[0].openStream());
                IOUtils.copyBytes(in,System.out,4096,false);
            }finally{
                IOUtils.closeStream(in);
            }
        }
        调用Hadoop中简洁的IOUtils类,并在finally字句夅关闭数据流,同时也可以在输入流和输出流之间复制数据(System.out),copyBytes方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用户设置复制结果后是否关闭数据流.这里选择自行关闭输入流,因而System.out不关闭输入流.
        运行示例:
            hadoop URLCat hdfs://localhost/user/tom/quangle.txt
    3.通过FileSystem API读取数据
        FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS.
        获取FileSystem实例有两种静态工厂方法:
            public static FileSystem get(Configurationconf)throws IOException
            public static FileSystem get(URIuri,Configurationconf)throws IOException
        Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-site.xml).
        第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统).
        第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统.
        有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:
            public FSDataInputStream open(Pathf)throws IOException
            public abstract FSDataInputStream open(Pathf,intbufferSize)throws IOException
        第一个方法使用默认的的缓冲区大小4kb.
        直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
        public class FileSystemCat(){
            public static void main(String[] args)throws Exception{
                String uri = args[0];
                Configuration conf= new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri),conf);
                InputStream in = null;
                try{
                    in = fs.open(new Path(uri));
                    IOUtils.copyBytes(in,System.out.4096,false);
                }finally{
                    IOUtils.closeStream(in);
                }
            }
        }
        程序运行结果如下:
            hadoop FileSystemCat hdfs://localhost/user/tom/quagnle.txt
    4.写入数据
        FileSystem类有一系列创建文件的方法.最简单的方法是给准备创建的文件制定一个Path对象.然后返回一个用于写入数据的输出流:
        public FSDataOutputStream create(Path f)throws IOException
        注意:create()方法能够为需要写入且当前不存在的文件创建父目录.尽管这样很方便,但有时并不希望这样.如果你希望不存在父目录就发生文件写入失败,则应该先调用exists()方法检查父目录是否存在.
        还有一个重载方法Propressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到你的应用:
            public interface Progressable{
                public void progress();
            }
        另外一种新建文件的方法,是使用append()方法在一个一有文件末尾追加数据(还存在一些其他重载版本):
            public FSDataOutputStream append(Path f)throws IOException
        该追加操作允许一个writer打开文件后访问该文件的最后偏移量处追加数据.有了这个API,某些应用可以创建无边界文件.
        例如:
            日志文件可以在机器重启后在已有文件后面继续追加数据.该追加操作是可选的,并非所有Hadoop文件系统都实现了该操作.
            例如,HDFS支持追加,但S3文件系统就不支持.
    5.将本地文件复制到Hadoop文件系统:
        public class FileCopyWithProgress{
            public static void main(String[] args){
                String localSrc = args[0];
                String dst = args[1];
                InputStream in = new BufferInputStream(newFileInputStream(localSrc));
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(dst),conf);
                OutputStream out = fs.create(new Path(dst),newProgressable(){
                    public void progress(){
                        System.out.print(".");
                    }
                });
                IOUtils.copyBytes(in,out,4096,true);
            }
        }
    6.FSDataInputStream
        public class FSdataInputStream extends DataInputStream implements Seekable,PositionedReadable{}
        Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的查询方法:
            public interface Seekable{
                void seek(long pos)throws IOException;
                long getPos()throws IOException;
                boolean seekToNewSource(long targetPos)throws IOException;
            }
        使用seek方法,将Hadoop文件系统中的一个文件标准输出上显示两次
            public class FileSystem DoubleCat{
                public static void main(String[] args)throws Exception{
                    String uri =args[0];
                    Configuration conf = new Configuration();
                    FileSystem fs = FileSystem.get(URI.create(uri),conf);
                    FSDataInputStream in=null;
                    try{
                        in = fs.open(new Path(uri));
                        IOUtils.copyByste(in,System.out,4096,false);
                        in.seek(0);
                        IOUtils.copyBytes(in,System.out,4096,false);
                    }finally{
                        IOUtils.closeStream(in);
                    }
                }
            }
        在一个小文件上运行的结果如下:
            hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
            FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处理读取文件的一部分:
            public interface PositionedReadable{
                public int read(long position,byte[] buffer,int offset,int length)throws IOException;
                public void readFully(long position,byte[] buffer,int offset,int length)throws IOException;
                public void readFully(long position,byte[] buffer)throws IOException;
            }
            read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处.返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度.
            readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常.
            所有这些方法会保留文件当前偏移量,并且是线程安全的,因此它们提供了再读取文件-----可能是元数据----的主题时访问文件的其他部分的便利方法.事实上,这只是按照以下模式实现的Seekable接口:
                lon goldPos = getPos();
                try{
                    seek(position);
                }finally{
                    seek(oldPos)
                }
            seek()方法是一个相对高开销的操作,需要慎重使用.建议用流数据来构建应用的访问模式(如使用MapReduce),而非执行大量的seek()方法.
    7.FSDataOutputStream对象
        FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputSream类相似,它也有一个检查文件当前位置的方法:
        public class FSDataOutputStream extends DataOutputStream implements Syncable{
            public long getPos()throwsIOException{
                //
            }
        }
        但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位.这是因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据.它不支持在除文件末尾之外的其他位置进行写入,因此,写入时定位就没有什么意义.
    8.FileSystem实例提供了创建目录的方法:
        public boolean mkdirs(Path f)throws IOException
        这个方法可以一次性创建所有必要但还没有的父目录,就像java.io.File类的mkdirs()方法.如果目录都已经创建成功,则返回true.
        通常,你不需要显式创建一个目录,因为调用create()方法写入文件时会自动创建父目录.

Hadoop查询文件系统:
    1.文件元数据:FileStatus
        任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件盒目录相关信息的功能.
        FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度,块大小,备份,修改时间,所有者以及权限信息.
        FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象.
        展示文件状态信息:
        public class ShowFileStatusTest{
            private MiniDFSCluster cluster;
            private FileSystem fs;
            @Before
            public void setUp()throws IOException{
                Configuration conf = new Configuration();
                if(System.getProperty("text.build.data")==null){
                    System.setProperty("test.build,data","/tmp");
                }
                cluster=newmMiniDFSCluster(conf,1,true,null);
                fs=cluster.getFileSystem();
                OutputStream out=fs.create(newPath("/dir/file"));
                out.write("content".getBytes("UTF-8"));
                out.close();
            }
            @After
            publicvoidtearDown(()throwsIOException{
                if(fs!=null){fs.close();}
                if(cluster!=null){cluster.shutdown();}
            }    
            @Test(expected=FileNotFoundException.class)
            public void throwsFileNotFoundNonExistentFile()throwsIOException{
                fs.getFileStatus(newPath("no-such-file"));
            }
            @Test
            public void fileStatusForFile()throws IOException{
                Path file = new Path("/dir/file");
                FileStatus stat = fs.getFileStatus(file);
                assertThat(stat.getPath().toUri().getPath(),is("/dir/file"));
                assertThat(stat.isDir(),is(false));
                assertThat(stat.getLen(),is(7L));
                assertThat(stat.getModificationTime(),is(lessThanOrEqualTO(System.currentTimeMillis())));
                assertThat(stat.getReplication(),is(short)1);
                assertThat(stat.getBlockSize(),is(64*1024*1024L));
                assertThat(stat.getOwner(),is("tom"));
                assertThat(stat.getGroup(),is("supergroup"));
                assertThat(stat.getPermission().toString(),is("rw-r--r--"));
            }
            @Test
            public void fileStatusForDirectory()throws IOException{
                Path dir = new Path("/dir")
                FileStatus stat = fs.getFileStatus(dir);
                assertThat(stat.getPath().toUri().getPath(),is("/dir"));
                assertThat(stat.isDir(),is(true));//是否是目录
                assertThat(stat.getLen(),is(0L));//文件长度
                assertThat(stat.getModificationTime(),is(lessThanOrEqualTO(System.currentTimeMillis())));//最后修改时间
                assertThat(stat.getReplication(),is(short)0);//备份数
                assertThat(stat.getBlockSize(),is(0L));//块大小
                assertThat(stat.getOwner(),is("tom"));//用户
                assertThat(stat.getGroup(),is("supergroup"));//组
                assertThat(stat.getPermission().toString(),is("rwxr-xrr-x"));//权限
            }
        }
        如果文件或目录均不存在,则会抛出FileNotFoundException异常.但是,如果只需检查文件或目录是否存在,那么调用exists()方法会更方便:
        public boolean exists(Path f)throws IOException
        
    2.列出文件:
        查找一个文件或目录的信息很实用,但通常你还需要能够列出目录的内容.这就是FileSystem的listStatus()方法的功能:
            public FileStatus[] listStatus(Path f)throws IOException
            public FileStatus[] listStatus(Path f,PathFilter fileter)throws IOException
            public FileStatus[] listStatus(Path[] files)throws IOException
            public FileStatus[] listStatus(Path[] files,PathFilter fileter)throws IOException
        当传入的参数是一个文件时,它会简单转变成以数组方法返回长度为1的FileStatus对象.
        当传入参数是一个目录时,则返回0或多个FileStatus对象,表示此目录中包含的文件和目录.
        注意FileUtil中stat2Paths()方法的使用,它将一个FileStatus对象数组转换为Path对象数组.
        例:显示Hadoop文件系统中一组路径的文件信息
        public class ListStatus{
            public static void main(String[]args){
                String uri = args[0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri),conf);
                Path[] paths = new Path[args.length];
                for(int i=0;i<paths.length;i++){
                    paths[i] = new Path(args[i]);
                }
                FileStatus[] status = fs.listStatus(paths);
                Path[] listedPaths = FileUtil.stat2Paths(status);
                for(Path p:listedPaths){
                    System.out.println(p);
                }
            }
        }
    3.文件模式
        在单个操作中处理一批文件,这是一个常见要求.举例来说,处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件.在一个表达式中使用通配符来匹配多个文件时比较方便的,无需列举每个文件盒目录来指定输入,该操作称为"通配".Hadoop为执行通配提供了两个FileSystem方法:
            public FileStatus[] globStatus(Path pathPattern)throws IOException
            public FileStatus[] globStatus(Path pathPattern,Path Filterfilter)throws IOException
            globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序.PathFilter命令作为可选项可以进一步对匹配限制.
            Hadoop支持的通配符与Unixbash相同.
            通配符            名称            匹配
            *                星号            匹配0或多个字符
            ?                问号            匹配单一个字符
            [ab]            字符类            匹配{a,b}集合中的一个字符
            [^ab]            非字符类        匹配非{a,b}集合中的一个字符
            [a-b]            字符范围    匹配一个在{a,b}范围内的字符(包括ab),a在字典顺序上要小于或等于b
            [^a-b]            非字符范围        匹配一个不在{a,b}范围内的字符(包括ab),a在字典顺序上要小于等于b
            {a,b}            或选择            匹配包含a或b中的一个的表达式
            \c                转义字符        匹配元字符c
    4.PathFilter对象            通配符模式并不总能够精确地描述我们想要访问的文件集.比如,使用通配格式排除一个特定的文件就不太可能,FileSystem中的listStatus()和globStatus()方法提供了可选的PathFilter对象,使我们能够通过编程方式控制通配符:
        public interface PathFilter{
            boolean accept(Path path);
        }
    PathFilter与java.io.FileFilter一样,是Path对象而不是File对象.
    例子:用于排序匹配正则表达式路径的PathFilter
        public class RegexExcludePathFilter implements PathFilter{
            private final String regex;
            public RegexExcludePathFilter(String regex){
                this.regex = regex;
            }
            public boolean accept(Path path){
                return !path.toString().matchers(regex);
            }
        }
    这个过滤器只传递不匹配正则表达式的文件.我们将该过滤器与预先去除文件的通配符想结合:过滤器可优化结果.
    fs.globStatus(newPath("/2007/*/*"),newRegexExcludeFilter("^.*/2007/12/31$"));
    过滤器由Path表示,只能作用于文件名.不能针对文件的属性来构建过滤器.但是,通配符模式和正则表达式同样无法对文件属性进行匹配.例如,如果你将文件存储在按照日期排列的目录结构中,则可以根据Pathfilter在给定的时间范围内选出文件.
    4.删除数据
        使用FileSystem的delete()方法可以永久性删除文件或目录.
        public boolean delete(Path f,boolean recursive)throwsIOException
        如果f是一个文件或空目录,那么recursive的值就会被忽略.只有在recrusive值为true时,一个非空目录及其内容才回被删除(否则会抛出IOException异常)


                                                                                                                                Name:Xr

                                                                                                                                Date:2014-03-12 21:54

© 著作权归作者所有

肖鋭
粉丝 10
博文 62
码字总数 29531
作品 0
朝阳
程序员
私信 提问
Hadoop中其他知识(24)

Thrift: 因为Hadoop文件系统的接口是通过JAVAApi提供的,所以其他非Java应用程序访问Hadoop文件系统会比较麻烦。 Thriftfs定制功能模块中ThriftApi通过把hadoop文件系统包装成一个ApacheThr...

肖鋭
2014/03/13
85
0
Hadoop 配置最后出现JAVA_HOME没有被设置和找到的错误

说明:本地机器1台,配置为master,本地机器虚拟1个slave,然后远程端机器1台slave。参照一些配置顺序的教程后,出现上述问题。因为是新手,想请教下问题在哪,或者能否有成功的朋友,给个可...

Ding_yu
2015/11/10
1K
2
Centos7上搭建hadoop2.7分布式集群环境实验记录

ps:因为实验需要和学习过程,为了更好学习hadoop,所以为了方便记录以及供学习交流,特此做一份hadoop搭建实验记录。 1.准备三台虚拟机。 2.修改三台虚拟机的主机名分别为master,slave01,sl...

mukvintt
2018/04/15
0
0
项目启动失败,表面上显示实体bean注入失败,maven依赖中显示已经有对应的项目实体

2017-06-27 15:37:24.914 [main] INFO com.mimidai.job.Application - Starting Application on I0TSZ7GX85EKQSV with PID 2884 (E:\mimidai\workspaces\eclipse\mimidai-job\mimidai-job-jo......

谢余峰
2017/06/27
2.7K
1
Hadoop tutorial - 2 - 安装hadoop 2015-3-23

工具: xshell () 安装包: hadoop-2.6.0.tar.gz->2.4.1 http://archive.apache.org/dist/hadoop/core/hadoop-2.4.1/ ----------5/19/2017----------start https://archive.apache.org/dist......

jayronwang
2015/03/23
145
0

没有更多内容

加载失败,请刷新页面

加载更多

Replugin借助“UI进程”来快速释放Dex

public static boolean preload(PluginInfo pi) { if (pi == null) { return false; } // 借助“UI进程”来快速释放Dex(见PluginFastInstallProviderProxy的说明) return PluginFastInsta......

Gemini-Lin
50分钟前
4
0
Hibernate 5 的模块/包(modules/artifacts)

Hibernate 的功能被拆分成一系列的模块/包(modules/artifacts),其目的是为了对依赖进行独立(模块化)。 模块名称 说明 hibernate-core 这个是 Hibernate 的主要(main (core))模块。定义...

honeymoose
今天
4
0
CSS--属性

一、溢出 当内容多,元素区域小的时候,就会产生溢出效果,默认是纵向溢出 横向溢出:在内容和容器之间再套一层容器,并且内部容器要比外部容器宽 属性:overflow/overflow-x/overflow-y 取值...

wytao1995
今天
4
0
精华帖

第一章 jQuery简介 jQuery是一个JavaScript库 jQuery具备简洁的语法和跨平台的兼容性 简化了JavaScript的操作。 在页面中引入jQuery jQuery是一个JavaScript脚本库,不需要特别的安装,只需要...

流川偑
今天
7
0
语音对话英语翻译在线翻译成中文哪个方法好用

想要进行将中文翻译成英文,或者将英文翻译成中文的操作,其实有一个非常简单的工具就能够帮助完成将语音进行翻译转换的软件。 在应用市场或者百度手机助手等各大应用渠道里面就能够找到一款...

401恶户
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部