文档章节

在java中使用HDFS(0.20.0)

boonya
 boonya
发布于 2017/02/14 15:20
字数 1435
阅读 20
收藏 0

转载文章示例

下面是如何在Java中读取和写入HDFS的代码示例。

1.创建配置对象:要能够读取或写入HDFS,需要创建一个Configuration对象,并使用hadoop配置文件将配置参数传递给它。

// Conf object will read the HDFS configuration parameters from these XML
    // files. You may specify the parameters for your own if you want.

    Configuration conf = new Configuration();
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

如果不将配置分配给conf对象(使用hadoop xml文件),则HDFS操作将在本地文件系统上执行,而不是在HDFS上执行。

2.将文件添加到HDFS:创建FileSystem对象并使用文件流添加文件。

FileSystem fileSystem = FileSystem.get(conf);
    
    // Check if the file already exists
    Path path = new Path("/path/to/file.ext");
    if (fileSystem.exists(path)) {
        System.out.println("File " + dest + " already exists");
        return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(
        new File(source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    // Close all the file descripters
    in.close();
    out.close();
    fileSystem.close();

3.从HDFS读取文件:在HDFS中创建一个文件流对象并读取它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();

4.从HDFS中删除文件:在HDFS中创建一个文件流对象并删除它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    // Delete file
    fileSystem.delete(new Path(file), true);

    fileSystem.close();

5.在HDFS中创建dir:在HDFS中创建一个文件流对象并读取它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
        System.out.println("Dir " + dir + " already not exists");
        return;
    }

    // Create directories
    fileSystem.mkdirs(path);

    fileSystem.close();

全部代码:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSClient {
    public HDFSClient() {

    }

    public void addFile(String source, String dest) throws IOException {
        Configuration conf = new Configuration();

        // Conf object will read the HDFS configuration parameters from these
        // XML files.
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        // Get the filename out of the file path
        String filename = source.substring(source.lastIndexOf('/') + 1,
            source.length());

        // Create the destination path including the filename.
        if (dest.charAt(dest.length() - 1) != '/') {
            dest = dest + "/" + filename;
        } else {
            dest = dest + filename;
        }

        // System.out.println("Adding file to " + destination);

        // Check if the file already exists
        Path path = new Path(dest);
        if (fileSystem.exists(path)) {
            System.out.println("File " + dest + " already exists");
            return;
        }

        // Create a new file and write data to it.
        FSDataOutputStream out = fileSystem.create(path);
        InputStream in = new BufferedInputStream(new FileInputStream(
            new File(source)));

        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        // Close all the file descripters
        in.close();
        out.close();
        fileSystem.close();
    }

    public void readFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        FSDataInputStream in = fileSystem.open(path);

        String filename = file.substring(file.lastIndexOf('/') + 1,
            file.length());

        OutputStream out = new BufferedOutputStream(new FileOutputStream(
            new File(filename)));

        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        in.close();
        out.close();
        fileSystem.close();
    }

    public void deleteFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        fileSystem.delete(new Path(file), true);

        fileSystem.close();
    }

    public void mkdir(String dir) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(dir);
        if (fileSystem.exists(path)) {
            System.out.println("Dir " + dir + " already not exists");
            return;
        }

        fileSystem.mkdirs(path);

        fileSystem.close();
    }

    public static void main(String[] args) throws IOException {

        if (args.length < 1) {
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);
        }

        HDFSClient client = new HDFSClient();
        if (args[0].equals("add")) {
            if (args.length < 3) {
                System.out.println("Usage: hdfsclient add <local_path> " + 
                "<hdfs_path>");
                System.exit(1);
            }

            client.addFile(args[1], args[2]);
        } else if (args[0].equals("read")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient read <hdfs_path>");
                System.exit(1);
            }

            client.readFile(args[1]);
        } else if (args[0].equals("delete")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient delete <hdfs_path>");
                System.exit(1);
            }

            client.deleteFile(args[1]);
        } else if (args[0].equals("mkdir")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
                System.exit(1);
            }

            client.mkdir(args[1]);
        } else {   
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);
        }

        System.out.println("Done!");
    }
}

Github操作示例

package com.cloudwick.mapreduce.FileSystemAPI;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


/**
 * Simple Driver to read/write to hdfs
 * @author ashrith
 *
 */
public class FileSystemOperations {
  public FileSystemOperations() {

  }

  /**
   * create a existing file from local filesystem to hdfs
   * @param source
   * @param dest
   * @param conf
   * @throws IOException
   */
  public void addFile(String source, String dest, Configuration conf) throws IOException {

    FileSystem fileSystem = FileSystem.get(conf);

    // Get the filename out of the file path
    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

    // Create the destination path including the filename.
    if (dest.charAt(dest.length() - 1) != '/') {
      dest = dest + "/" + filename;
    } else {
      dest = dest + filename;
    }

    // System.out.println("Adding file to " + destination);

    // Check if the file already exists
    Path path = new Path(dest);
    if (fileSystem.exists(path)) {
      System.out.println("File " + dest + " already exists");
      return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(new File(
        source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    // Close all the file descriptors
    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * read a file from hdfs
   * @param file
   * @param conf
   * @throws IOException
   */
  public void readFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * delete a directory in hdfs
   * @param file
   * @throws IOException
   */
  public void deleteFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    fileSystem.delete(new Path(file), true);

    fileSystem.close();
  }

  /**
   * create directory in hdfs
   * @param dir
   * @throws IOException
   */
  public void mkdir(String dir, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
      System.out.println("Dir " + dir + " already not exists");
      return;
    }

    fileSystem.mkdirs(path);

    fileSystem.close();
  }

  public static void main(String[] args) throws IOException {

    if (args.length < 1) {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    FileSystemOperations client = new FileSystemOperations();
    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

    Configuration conf = new Configuration();
    // Providing conf files
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
    // (or) using relative paths
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

    //(or)
    // alternatively provide namenode host and port info
    conf.set("fs.default.name", hdfsPath);

    if (args[0].equals("add")) {
      if (args.length < 3) {
        System.out.println("Usage: hdfsclient add <local_path> "
            + "<hdfs_path>");
        System.exit(1);
      }

      client.addFile(args[1], args[2], conf);

    } else if (args[0].equals("read")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient read <hdfs_path>");
        System.exit(1);
      }

      client.readFile(args[1], conf);

    } else if (args[0].equals("delete")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient delete <hdfs_path>");
        System.exit(1);
      }

      client.deleteFile(args[1], conf);

    } else if (args[0].equals("mkdir")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
        System.exit(1);
      }

      client.mkdir(args[1], conf);

    } else {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    System.out.println("Done!");
  }
}

Github资源文件

Github地址:https://gist.github.com/ashrithr/f7899fdfd36ee800f151

本文转载自:http://blog.rajeevsharma.in/2009/06/using-hdfs-in-java-0200.html

共有 人打赏支持
boonya
粉丝 73
博文 214
码字总数 43922
作品 0
成都
高级程序员
私信 提问
使用 FileSystem JAVA API 对 HDFS 进行读、写、删除等操作

Hadoop文件系统 基本的文件系统命令操作, 通过hadoop fs -help可以获取所有的命令的详细帮助文件。 Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽...

大数据之路
2012/12/05
0
0
VirtualBox、CentOS 6.4、Hadoop、Hive玩起

1 安装VirtualBox: VirtualBox是一款开源免费的并且非常强大的虚拟机软件,同时支持X86和AMD64/Intel64,可以在多个操作系统平台上运行。与同性质的VMWare和Virtual PC比较,VirtualBox独到...

smile_zjw
2013/11/26
0
1
03.Hadoop集群搭建(完全分布式)

Hadoop集群搭建(完全分布式) 一.准备Linux阶段 1.安装VMware WorkStation软件 百度找资源,获得激活序列号,否则只能使用30天。 2.在虚拟机上安装Linux操作系统 我这里安装的是CentOS 6....

weixin_42217819
05/23
0
0
八步教你在笔记本电脑创建Hadoop本地实例!

  【IT168 评论】要想进入大数据领域,Hadoop是一件非常重要的事情,它具有复杂的安装过程,大量的集群,数百台机器以及TB(或者PB)级别的数据等。但实际上,用户可以下载简单的JAR并在个人...

it168网站
05/25
0
0
Sqoop集成Teradata驱动

Teradata Sqoop部署过程 驱动包下载 下载sqoop-connector-teradata.jar sqoop-connector-teradata-1.6c5.jar 下载teradata jdbc jar terajdbc4.jar tdgssconfig.jar 拷贝至$SQOOP_HOME/lib 配......

Yulong_
2016/11/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

PHP动态扩展Redis模块

查看已有模块 [root@test-a ~]# /usr/local/php/bin/php -m[PHP Modules]bz2Core...zlib[Zend Modules] 下载包,解压,生成configure文件 [root@test-a ~]# cd /usr/local/src/[ro......

野雪球
22分钟前
1
0
在Ignite中使用线性回归算法

在本系列前面的文章中,简单介绍了一下Ignite的机器学习网格,下面会趁热打铁,结合一些示例,深入介绍Ignite支持的一些机器学习算法。 如果要找合适的数据集,会发现可用的有很多,但是对于...

李玉珏
29分钟前
2
0
Mybatis应用学习——简单使用示例

1. 传统JDBC程序中存在的问题 1. 一个简单的JDBC程序示例: public class JDBCDemo {public static void main(String[] args) {Connection con=null;PreparedStatement statemen...

江左煤郎
36分钟前
2
0
使用JavaScript编写iOS应用业务逻辑

JSAUIKitCocoa使你可以使用JavaScript编写对性能要求不高但可能变动性很大的iOS应用的业务逻辑部分,View组件、需要多线程支持的Model等则直接使用原生对象。 编写方式与React Native相似,但...

neal01
41分钟前
1
0
艺术品区块链溯源防伪平台(连载一)

Netkiller Blockchain 手札 作者正在找工作,联系方式 13113668890 Mr. Neo Chan, 陈景峯(BG7NYT) 中国广东省深圳市望海路半岛城邦三期 518067 +86 13113668890 <netkiller@msn.com> 文档始创...

netkiller-
42分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部