文档章节

HDFS分布式文件系统(2) HDFS的java接口

为为02
 为为02
发布于 2017/02/12 17:09
字数 5228
阅读 702
收藏 20

HDFS 的 java接口

Hadoop是使用java编写的,通过JAVA API可以调用所有Hadoop文件系统的交互操作。例如,文件系统的命令解释器就是一个java应用,它使用JAVA 的FileSystem类来提供文件系统操作。其它一些文件系统接口与HDFS一起使用,因为Hadoop中其它一些文件系统一般都有访问基本文件系统的工具,但它们大多数都能用于任何Hadoop文件系统。

HTTP接口

通过HTTP接口来访问HDFS有两种办法:直接访问,HDFS后台进程直接服务于来自客户端的请求(由namenode内嵌的web服务器的50070端口提供服务,目录列表以xml或者JSON格式存储,且文件数据由datanode的web服务器的50075端口以数据留的形式传输);通过代理( 一个对多个)访问,客户端通常使用DistributedFilesystem API访问HDFS。

JAVA接口

1. 从Hadoop URL读取数据

要从Hadoop文件系统读取文件,最简单的办法是使用java.net.URL对象打开数据流,从中读取数据。具体格式如下

    Inputstream in = null;
    try{
        in = new URL("hdfs://host/path").openSteam();
        //process in
    }finally{
        IOUtils.closeStream(in);
    }

让java程序能够识别Hadoop的hdfs URL方案还需要一些额外的工作。这里采用的是通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。每个java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。这个限制意味着如果程序的其它组件---如不受你控制的第三方组件---已经声明了一个URLStreamHandlerFactory实例,你将无法使用Hadoop中的实例了。后面我们再继续讨论另外一种备用的办法。

示例1: 展示的程序以标准输出的方式显示Hadoop文件系统中的文件,类似与UNIX系统中的cat命令

    package cn.weiwei.WHadoop.hdfs;
    
    import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
    import org.apache.hadoop.io.IOUtils;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    
    /**
     * [@author](https://my.oschina.net/arthor) WangWeiwei
     * [@version](https://my.oschina.net/u/931210) 1.0
     * [@sine](https://my.oschina.net/mysine) 17-2-6
     *
     * 通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统中的文件
     */
    public class URLCat {
        static {
            URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        }
    
        public void catFile(String[] args) throws IOException {
            InputStream inputStream = null;
            try {
                inputStream = new URL(args[0]).openStream();
                IOUtils.copyBytes(inputStream,System.out,4096,false);
            }finally {
                IOUtils.closeStream(inputStream);
            }
        }
    }

测试类如下:

    package cn.weiwei.WHadoop.hdfs;
    
    import org.junit.Test;
    
    import static org.junit.Assert.*;
    
    /**
     * [@author](https://my.oschina.net/arthor) WangWeiwei
     * [@version](https://my.oschina.net/u/931210) 1.0
     * @sine 17-2-6
     */
    public class URLCatTest {
        @Test
        public void catFile() throws Exception {
            String[] a = {"hdfs://172.17.0.2:9000/usr/weiwei/quangle.txt"};
            URLCat urlCat = new URLCat();
            urlCat.catFile(a);
        }
    
    }

注意更换测试方法中的ip地址以及文件名等信息,如果程序执行成功,则输出的结果应该如下:

    2017-02-06 18:02:23,560 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    On the top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.
    
    Process finished with exit code 0

2. 通过 FileSystem API 读取数据

正如前面所描述的,有时候我们根本无法设置JVM的URLStreamHandlerFactory实例。在这种情况下, 我们还可以通过使用FileSystem API来打开一个文件的输入流。

Hadoop文件系统中通过Hadoop Path对象(而非 java.io.File对象,因为它的语义与本地文件系统联系太紧密)来代表文件。可以将路径视为一个hadoop文件系统URI,如hdfs://172.17.0.2:9000/usr/weiwei/quangle.txt。

FileSystem是一个通用的文件系统API,所以第一步是检索我们要使用的文件系统实例,这里是HDFS。 获取 FileSystem实例有以下几个静态工厂方法:

    public static FileSystem get(Configuration conf) throws IOException
    public static FileSystem get(URI uri,Configuration conf) throws IOException
    public static FileSystem get(URI uri,Configuration conf,String user) throws IOException

Configuration 对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如 conf/core-site.xml)。 第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,如果没有指定,则使用默认的本地 文件系统)。第二个方法通过给定的URI和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则 返回默认的文件系统。第三,作为给定用户来访问文件系统,对安全来说是至关重要的。

在某些情况下,你可能希望获取本地文件系统运行的实例,此时你可以使用getLocal()方法方便的获取。

    public static LocalFIleSystem getLocal(Configuration conf) throws IOException

有了FileSystem实例之后,我们调用 open() 函数来获取文件的输入流:

    public FSDataInputStream open(Path f) throws IOException
    public abstract FSDataInputStream open(Path f,int bufferSize) throws IOException

另一个方法使用默认的缓冲区大小4KB。 最后,我们重写例1,得到例2 例2: 直接使用FIleSystem以标准输出格式显示Hadoop文件系统中的文件

    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    import java.io.InputStream;
    import java.net.URI;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     * 直接使用FIleSystem以标准输出格式显示Hadoop文件系统中的文件
     */
    public class FileSystemCat {
        public void cat (String uri) throws Exception{
            Configuration conf = new Configuration();
            FileSystem fileSystem = FileSystem.get(URI.create(uri),conf);
            InputStream inputStream = null;
            try {
                inputStream = fileSystem.open(new Path(uri));
                IOUtils.copyBytes(inputStream,System.out,4096,false);
            }finally {
                IOUtils.closeStream(inputStream);
            }
        }
    }

测试程序:

    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    import org.junit.Test;
    
    import static org.junit.Assert.*;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     */
    public class FileSystemCatTest {
        @Test
        public void cat() throws Exception {
            FileSystemCat fileSystemCat = new FileSystemCat();
            fileSystemCat.cat("hdfs://172.17.0.2:9000/usr/weiwei/quangle.txt");
        }
    
    }

运行结果:

    2017-02-06 18:52:19,298 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    On the top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.
    
    Process finished with exit code 0
FSDataInputStream对象

实际上FileSystem对象中的open()方法返回的是一个FSDataInputStream对象,而不是标准的 java.io类对象。这个类特使继承了java.io.DataInputStream接口的一个特殊类,并支持随机访问, 由此可从流的任意位置读取数据。

    package org.apache.hadoop.fs;
    
    import java.io.*;
    import java.nio.ByteBuffer;
    import java.util.EnumSet;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.io.ByteBufferPool;
    import org.apache.hadoop.fs.ByteBufferUtil;
    import org.apache.hadoop.util.IdentityHashStore;
    
    /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
     * and buffers input through a {@link BufferedInputStream}. */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class FSDataInputStream extends DataInputStream
        implements Seekable, PositionedReadable, 
          ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
          HasEnhancedByteBufferAccess, CanUnbuffer {
          
     }

Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量( getPos())的查询方法:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.hadoop.fs;
    
    import java.io.*;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    
    /**
     *  Stream that permits seeking.
     */
    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public interface Seekable {
      /**
       * Seek to the given offset from the start of the file.
       * The next read() will be from that location.  Can't
       * seek past the end of the file.
       */
      void seek(long pos) throws IOException;
      
      /**
       * Return the current offset from the start of the file
       */
      long getPos() throws IOException;
    
      /**
       * Seeks a different copy of the data.  Returns true if 
       * found a new source, false otherwise.
       */
      @InterfaceAudience.Private
      boolean seekToNewSource(long targetPos) throws IOException;
    }

调用seek()来定位大于文件长度时会引发IOException异常。与java.io.InputStream的skip() 不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

例3为例2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后,定位到文件的起始位置再次 以流的方式读取该文件。

例3 : 使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次

    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    import java.net.URI;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     * 使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次
     */
    public class FileSystemDoubleCat {
        public void cat (String uri) throws Exception{
            Configuration conf = new Configuration();
            FileSystem fileSystem = FileSystem.get(URI.create(uri),conf);
            FSDataInputStream inputStream = null;
            try {
                inputStream = fileSystem.open(new Path(uri));
                IOUtils.copyBytes(inputStream,System.out,4096,false);
                inputStream.seek(0);//go back to the start of file
                IOUtils.copyBytes(inputStream,System.out,4096,false);
            }finally {
                IOUtils.closeStream(inputStream);
            }
        }
    }

运行结果:

    2017-02-06 19:22:07,472 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    On the top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.
    On the top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.
    
    Process finished with exit code 0

FSDataInputStream 类也实现了 PositionedReadable 借口,从一个指定偏移量读取文件的一部分:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.hadoop.fs;
    
    import java.io.*;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    
    /** Stream that permits positional reading. */
    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public interface PositionedReadable {
      /**
       * Read upto the specified number of bytes, from a given
       * position within a file, and return the number of bytes read. This does not
       * change the current offset of a file, and is thread-safe.
       */
      public int read(long position, byte[] buffer, int offset, int length)
        throws IOException;
      
      /**
       * Read the specified number of bytes, from a given
       * position within a file. This does not
       * change the current offset of a file, and is thread-safe.
       */
      public void readFully(long position, byte[] buffer, int offset, int length)
        throws IOException;
      
      /**
       * Read number of bytes equal to the length of the buffer, from a given
       * position within a file. This does not
       * change the current offset of a file, and is thread-safe.
       */
      public void readFully(long position, byte[] buffer) throws IOException;
    }

read()方法从文件的指定position处读取至多为length字节的数据并存入缓存区buffer的指定偏移量 offset处。返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于length的长度。 readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length 长度的字节数据),如果已经读到文件末尾还未读完,将会抛出EOFException异常。

所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDataInputStream并不是为并发访问设计的, 因此最好为此新建多个实例),因此它们提供了在读文件---可能是元数据---的主体时,访问文件其它 部分便利的方法。事实上这只是按照以下模式实现的Seekable接口。

seek()方法是一个开销非常大的工作,需要慎重使用。建议用流数据来构建应用的访问模式(如使用MapReduce), 而非执行大量的seek()方法。

3. 写入数据

FileSystem类由一系列新建文件的方法。最简单的方法是给准备新建的文件指定一个Path对象,然后返回一个用于写入数据的输出流:

    public FSDataOutputStream create(Path path) throws IOException

此方法有多个重载版本,允许我们指定是否需要强制覆盖现有文件/文件备份数量/写入文件时所用缓冲区大小。

create()方法能够为需要写入且当前不存在的文件创建父目录。尽管这样很方便,但有时不希望这样。如果希望父目录不存在就导致写入失败,则应该首先调用exists()方法检查父目录是否存在。

还有一个重载方法 Progressable 用于传递回调接口,如此一来,可以把数据写入datanode的进度通知给应用:

    package org.apache.hadoop.utils;
    
    public interface Progressable {
        public void progress();
    }

另外一种新建文件的方法是使用append()方法在一个已有文件末尾追加数据(还有一些其它重载版本):

    public FSDataOutputStream append(Path path) throws IOException

这样追加操作允许一个writer打开文件后在访问该文件的最后偏移量处追加数据。有了这个API,某些应用可以创建无边界文件,例如,应用可以在关闭日志文件后继续追加日志。该追加操作是可选的,并非所有Hadoop文件系统都实现了该操作。例如HDFS支持追加,但S3文件系统就不支持。

例4展示了如何将本地文件复制到Hadoop文件系统。每次Hadoop调用progress()方法时---也就是每次将64KB数据包写入datanode管线后---打印一个时间点来显示整个运行过程。注意这个操作并不是通过API来实现的,因此Hadoop后续版本是否支持以上操作,取决于该版本是否修改过上述操作。API只让你知道“正在发生什么事”。

    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.URI;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     * 将本地文件复制到HDFS文件系统
     */
    public class FileCopyWithProgress {
        public void copyFileToHDFS(String localSrc,String dst) throws Exception{
            InputStream inputStream = new BufferedInputStream(new FileInputStream(localSrc));
            Configuration configuration = new Configuration();
            FileSystem fileSystem = FileSystem.get(URI.create(dst),configuration);
            OutputStream outputStream = fileSystem.create(new Path(dst), new Progressable() {
                @Override
                public void progress() {
                    System.out.print(".");
                }
            });
            IOUtils.copyBytes(inputStream,outputStream,4096,true);
        }
    }

测试类代码:

    package cn.weiwei.WHadoop.hdfs;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     */
    public class HDFSPathTest {
        public static String HDFS_ROOT_PATH = "hdfs://172.17.0.2:9000";
        public static String HDFS_WORKSPACE_PATH = "hdfs://172.17.0.2:9000/usr/workspace";
    }


    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    import cn.weiwei.WHadoop.hdfs.HDFSPathTest;
    import org.junit.Test;
    
    import static org.junit.Assert.*;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     */
    public class FileCopyWithProgressTest extends HDFSPathTest {
        @Test
        public void copyFileToHDFS() throws Exception {
            FileCopyWithProgress fileCopyWithProgress = new FileCopyWithProgress();
            fileCopyWithProgress.copyFileToHDFS("/media/weiwei/office/workspace/IntelliJIDEA/WHadoop/input/docs/1400-8.txt",
                    HDFS_WORKSPACE_PATH + "/input/docs/1400-8.txt");
        }
    
    }

运行结果:

   2017-02-06 21:08:06,062 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
   ..................
   Process finished with exit code 0

目前,其它Hadoop文件系统写入文件时均不调用progress()方法。在后面的内容,我们将展示进度对MapReduce的重要性。

FSDataOutputStream对象

FIleSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似,它也有一个查询文件当前位置的方法:

       package org.apache.hadoop.fs;

       public class FSDataOutputStream extends DataOutputStream implements Syncable{
            public long getPos() throws IOException{
                //implementation elided
            }
            //implementation elided
       }

但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个已打开的类顺序写入,或在现有的文件末尾追加数据。换句话说,它不支持在文件末尾之外的其它位置写入,因此,写入时定位也就没有什么意义。

目录

FileSystem实例提供了创建目录的方法:

    public boolean mkdirs(Path path) throws IOException

这个方法可以一次性新建多有有必要但还没有的父目录,就像java.io.File类的mkdirs()方法。如果目录(以及所有的父目录)都已经创建成功,则返回true。

查询文件系统
  1. 文件元数据 任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存的文件的目录相关ini的功能。FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度,块大小,副本,修改时间,所有者以及权限信息。

FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。例5展示了它的用法。

例5: 展示文件状态信息

    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    
    import cn.weiwei.WHadoop.hdfs.HDFSPathTest;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hdfs.MiniDFSCluster;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.OutputStream;
    
    import static org.hamcrest.MatcherAssert.assertThat;
    import static org.hamcrest.core.Is.is;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-6
     * 展示文件状态信息
     */
    public class ShowFileStatusTest extends HDFSPathTest {
        private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
        private FileSystem fs;
    
        @Before
        public void setUp() throws IOException {
            Configuration conf = new Configuration();
            if (System.getProperty("test.build.data") == null) {
                System.setProperty("test.build.data", "/tmp");
            }
            cluster = new MiniDFSCluster(conf, 1, true, null);
            fs = cluster.getFileSystem();
            OutputStream out = fs.create(new Path("/dir/file"));
            out.write("content".getBytes("UTF-8"));
            out.close();
        }
    
        @After
        public void tearDown() throws IOException {
            if (fs != null) { fs.close(); }
            if (cluster != null) { cluster.shutdown(); }
        }
    
        @Test(expected = FileNotFoundException.class)
        public void throwsFileNotFoundForNonExistentFile() throws IOException {
            fs.getFileStatus(new Path("no-such-file"));
        }
    
        @Test
        public void fileStatusForFile() throws IOException {
            Path file = new Path("/dir/file");
            FileStatus stat = fs.getFileStatus(file);
            assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
            assertThat(stat.isDir(), is(false));
            assertThat(stat.getLen(), is(7L));
    //        assertThat(stat.getModificationTime(),
    //                is(lessThanOrEqualTo(System.currentTimeMillis())));
            assertThat(stat.getReplication(), is((short) 1));
    //        assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
            assertThat(stat.getOwner(), is(System.getProperty("user.name")));
            assertThat(stat.getGroup(), is("supergroup"));
            assertThat(stat.getPermission().toString(), is("rw-r--r--"));
        }
    
        @Test
        public void fileStatusForDirectory() throws IOException {
            Path dir = new Path("/dir");
            FileStatus stat = fs.getFileStatus(dir);
            assertThat(stat.getPath().toUri().getPath(), is("/dir"));
            assertThat(stat.isDir(), is(true));
            assertThat(stat.getLen(), is(0L));
    //        assertThat(stat.getModificationTime(),
    //                is(lessThanOrEqualTo(System.currentTimeMillis())));
            assertThat(stat.getReplication(), is((short) 0));
            assertThat(stat.getBlockSize(), is(0L));
            assertThat(stat.getOwner(), is(System.getProperty("user.name")));
            assertThat(stat.getGroup(), is("supergroup"));
            assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
        }
    }

如果文件或目录不存在,就会抛出一个FileNotFoundException异常。但如果只是想检查文件或,兖是否存在,那么调用exists()方法会更方便。

  1. 列出文件 查找一个文件或目录相关的信息很实用,但通常还需要能够列出目录中的内容。这就是FileSystem的 listStatus()方法的功能:

    public FileStatus[] listStatus(Path path) throws IOException public FileStatus[] listStatus(Path path,PathFilter filter) throws IOException public FileStatus[] listStatus(Path[] paths) throws IOException public FileStatus[] listStatus(Path[] paths, PathFilter filter) throws IOException

当传入的参数是一个文件时,它会简单的转变成以数组形式返回长度为1的FileStatus对象。当传入参数是一个目录时,则返回 0 或多个FileStatus对象,表示此目录中包含的文件和目录。

它的重载方法允许使用PathFilter来限制匹配的文件和目录。最后如果指定一组路径,其执行结果相当于依次轮流传递每条路径,并对其调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中。当需要从文件系统树的不同分支构建输入文件列表时,这个方法还是非常方便的。

例6: 显示Hadoop文件系统中一组路径的文件信息

            package cn.weiwei.WHadoop.hdfs.filesystem;
    
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FileStatus;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.FileUtil;
        import org.apache.hadoop.fs.Path;
    
        import java.io.IOException;
        import java.net.URI;
    
        /**
         * @author WangWeiwei
         * @version 1.0
         * @sine 17-2-12
         * 显示hadoop文件系统中一组路径的文件信息
         */
        public class ListStatus {
            public void listStatus(String[] args) throws IOException {
                String uri = args[0];
                Configuration configuration = new Configuration();
                FileSystem fileSystem = FileSystem.get(URI.create(uri),configuration);
    
                Path[] paths = new Path[args.length];
                for (int i = 0;i < paths.length; i++){
                    paths[i] = new Path(args[i]);
                }
                FileStatus[] fileStatuses = fileSystem.listStatus(paths);
                Path[] listedPaths = FileUtil.stat2Paths(fileStatuses);
                for (Path path : listedPaths){
                    System.out.println(path);
                }
            }
        }

测试程序:

    package cn.weiwei.WHadoop.hdfs.filesystem;
    
    import cn.weiwei.WHadoop.hdfs.HDFSPathTest;
    import org.junit.Test;
    
    import static org.junit.Assert.*;
    
    /**
     * @author WangWeiwei
     * @version 1.0
     * @sine 17-2-12
     */
    public class ListStatusTest extends HDFSPathTest {
        @Test
        public void listStatus() throws Exception {
            ListStatus listStatus = new ListStatus();
            listStatus.listStatus(new String[]{HDFS_ROOT_PATH + "/",HDFS_WORKSPACE_PATH + "/"});
        }
    
    }

结果:

    2017-02-12 15:34:19,230 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    hdfs://172.17.0.2:9000/usr
    hdfs://172.17.0.2:9000/workspace
    hdfs://172.17.0.2:9000/usr/workspace/input
    
    Process finished with exit code 0
  1. 文件模式 在单个操作中处理一批文件是一个很常见的需求。例如,哥哥用于处理日志的MapReduce作业可能需要分析一个月内包含大量目录中的日志文件。在一个表达式中通过使用通配符来匹配多个文件,无需列举每个文件和目录来指定输入,就可以集中这些符合规则的文件或目录进行批处理操作,这种操作称为“通配”(globbing)。Hadoop为执行通配提供了两个FileSystem方法:

    public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

globStatus()方法返回与其路径匹配于指定模式的所有文件的FileStatus对象数组,并按路径排序。PathFilter命令作为可选项可以进一步对匹配结果进行限制。

Hadoop支持的通配符与 UNIX bash的相同 通配符及其含义

  1. PathFilter对象

    通配符模式并不能够精确的描述我们想要访问的文件集。比如使用通配格式排除一个特定的文件就不太可能。FileSystem中的listStatus()和globStatus()方法提供了可选的PathFilter对象,以编程的方式控制通配符:

     /**
      * Licensed to the Apache Software Foundation (ASF) under one
      * or more contributor license agreements.  See the NOTICE file
      * distributed with this work for additional information
      * regarding copyright ownership.  The ASF licenses this file
      * to you under the Apache License, Version 2.0 (the
      * "License"); you may not use this file except in compliance
      * with the License.  You may obtain a copy of the License at
      *
      *     http://www.apache.org/licenses/LICENSE-2.0
      *
      * Unless required by applicable law or agreed to in writing, software
      * distributed under the License is distributed on an "AS IS" BASIS,
      * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
     package org.apache.hadoop.fs;
    
     import org.apache.hadoop.classification.InterfaceAudience;
     import org.apache.hadoop.classification.InterfaceStability;
    
     @InterfaceAudience.Public
     @InterfaceStability.Stable
     public interface PathFilter {
       /**
        * Tests whether or not the specified abstract pathname should be
        * included in a pathname list.
        *
        * @param  path  The abstract pathname to be tested
        * @return  <code>true</code> if and only if <code>pathname</code>
        *          should be included
        */
       boolean accept(Path path);
     }
    

    PathFilter与java.io.FileFilter一样,是Path对象,而不是File对象。

    例7: PathFilter,用于排除正则表达式路径

     package cn.weiwei.WHadoop.hdfs.filesystem;
    
     import org.apache.hadoop.fs.Path;
     import org.apache.hadoop.fs.PathFilter;
    
     /**
      * @author WangWeiwei
      * @version 1.0
      * @sine 17-2-12
      * 用于排除正则表达式路径
      */
     public class RegexExcludePathFilter implements PathFilter{
    
         private final String regex;
    
         public RegexExcludePathFilter(String regex){
             this.regex = regex;
         }
    
         @Override
         public boolean accept(Path path) {
             return !path.toString().matches(regex);
         }
     }
    

    6. 删除数据

    使用FileSystem的delete()方法可以永久性的删除文件或目录。

     public boolean delete(Path path,boolean recursive) throws IOException
    

    如果path是一个文件或空目录,那么recursive的值就会被忽略。只有在recursive值为true时,非空目录及其内容才会被删除(否则会抛出IOException异常)。

© 著作权归作者所有

为为02
粉丝 51
博文 44
码字总数 99356
作品 0
海淀
程序员
私信 提问
Hadoop中其他知识(24)

Thrift: 因为Hadoop文件系统的接口是通过JAVAApi提供的,所以其他非Java应用程序访问Hadoop文件系统会比较麻烦。 Thriftfs定制功能模块中ThriftApi通过把hadoop文件系统包装成一个ApacheThr...

肖鋭
2014/03/13
85
0
Hadoop入门 -- 简介,安装,示例

(1) Hadoop简介 Hadoop是Apache基金会旗下开源项目,是一款开源的可靠、可扩展的分布式计算软件平台。 Hadoop可以看做是实现分布式计算的一个框架。可利用其提供的函数接口进行简单编程,对数...

fjie
2015/02/25
0
0
Hadoop安装部署

本节课程概览 Hadoop 在windows 上伪分布式的安装过程 Hadoop 在linux 上单节点伪分布式的安装过程 集成Eclipse 开发环境 Hadoop UI 介绍 运行WordCounter 事例 第一部分:Hadoop 在windows...

Artemjor
2014/01/10
995
1
大数据(hadoop-Hadoop2.7.3伪分布搭建)

安装准备: vmware10 Centos6.5 64位版本 JDK1.8linux32位版 Hadoop2.7.3版本 安装过程讲解:  字符界面安装centos6.5,网络设置为主机模式或者桥连接  配置静态ip 并测试能不能...

这很耳东先生
04/16
24
0
谁能指导下分布式文件系统的选型,存储小文件用的,图片、word、pdf之类的文档

谁能指导下分布式文件系统的选型,存储小文件用的,图片、word、pdf之类的文档。总体规模在 千万级个文件。 公司后端用的 java 架构的基于 centos系统, 目前针对文件存储的方式是,单独列出...

从前
2013/01/25
6.1K
8

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
6
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
6
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
8
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部