文档章节

在java中使用HDFS(0.20.0)

boonya
 boonya
发布于 2017/02/14 15:20
字数 1435
阅读 20
收藏 0

转载文章示例

下面是如何在Java中读取和写入HDFS的代码示例。

1.创建配置对象:要能够读取或写入HDFS,需要创建一个Configuration对象,并使用hadoop配置文件将配置参数传递给它。

// Conf object will read the HDFS configuration parameters from these XML
    // files. You may specify the parameters for your own if you want.

    Configuration conf = new Configuration();
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

如果不将配置分配给conf对象(使用hadoop xml文件),则HDFS操作将在本地文件系统上执行,而不是在HDFS上执行。

2.将文件添加到HDFS:创建FileSystem对象并使用文件流添加文件。

FileSystem fileSystem = FileSystem.get(conf);
    
    // Check if the file already exists
    Path path = new Path("/path/to/file.ext");
    if (fileSystem.exists(path)) {
        System.out.println("File " + dest + " already exists");
        return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(
        new File(source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    // Close all the file descripters
    in.close();
    out.close();
    fileSystem.close();

3.从HDFS读取文件:在HDFS中创建一个文件流对象并读取它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();

4.从HDFS中删除文件:在HDFS中创建一个文件流对象并删除它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    // Delete file
    fileSystem.delete(new Path(file), true);

    fileSystem.close();

5.在HDFS中创建dir:在HDFS中创建一个文件流对象并读取它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
        System.out.println("Dir " + dir + " already not exists");
        return;
    }

    // Create directories
    fileSystem.mkdirs(path);

    fileSystem.close();

全部代码:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSClient {
    public HDFSClient() {

    }

    public void addFile(String source, String dest) throws IOException {
        Configuration conf = new Configuration();

        // Conf object will read the HDFS configuration parameters from these
        // XML files.
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        // Get the filename out of the file path
        String filename = source.substring(source.lastIndexOf('/') + 1,
            source.length());

        // Create the destination path including the filename.
        if (dest.charAt(dest.length() - 1) != '/') {
            dest = dest + "/" + filename;
        } else {
            dest = dest + filename;
        }

        // System.out.println("Adding file to " + destination);

        // Check if the file already exists
        Path path = new Path(dest);
        if (fileSystem.exists(path)) {
            System.out.println("File " + dest + " already exists");
            return;
        }

        // Create a new file and write data to it.
        FSDataOutputStream out = fileSystem.create(path);
        InputStream in = new BufferedInputStream(new FileInputStream(
            new File(source)));

        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        // Close all the file descripters
        in.close();
        out.close();
        fileSystem.close();
    }

    public void readFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        FSDataInputStream in = fileSystem.open(path);

        String filename = file.substring(file.lastIndexOf('/') + 1,
            file.length());

        OutputStream out = new BufferedOutputStream(new FileOutputStream(
            new File(filename)));

        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        in.close();
        out.close();
        fileSystem.close();
    }

    public void deleteFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        fileSystem.delete(new Path(file), true);

        fileSystem.close();
    }

    public void mkdir(String dir) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(dir);
        if (fileSystem.exists(path)) {
            System.out.println("Dir " + dir + " already not exists");
            return;
        }

        fileSystem.mkdirs(path);

        fileSystem.close();
    }

    public static void main(String[] args) throws IOException {

        if (args.length < 1) {
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);
        }

        HDFSClient client = new HDFSClient();
        if (args[0].equals("add")) {
            if (args.length < 3) {
                System.out.println("Usage: hdfsclient add <local_path> " + 
                "<hdfs_path>");
                System.exit(1);
            }

            client.addFile(args[1], args[2]);
        } else if (args[0].equals("read")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient read <hdfs_path>");
                System.exit(1);
            }

            client.readFile(args[1]);
        } else if (args[0].equals("delete")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient delete <hdfs_path>");
                System.exit(1);
            }

            client.deleteFile(args[1]);
        } else if (args[0].equals("mkdir")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
                System.exit(1);
            }

            client.mkdir(args[1]);
        } else {   
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);
        }

        System.out.println("Done!");
    }
}

Github操作示例

package com.cloudwick.mapreduce.FileSystemAPI;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


/**
 * Simple Driver to read/write to hdfs
 * @author ashrith
 *
 */
public class FileSystemOperations {
  public FileSystemOperations() {

  }

  /**
   * create a existing file from local filesystem to hdfs
   * @param source
   * @param dest
   * @param conf
   * @throws IOException
   */
  public void addFile(String source, String dest, Configuration conf) throws IOException {

    FileSystem fileSystem = FileSystem.get(conf);

    // Get the filename out of the file path
    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

    // Create the destination path including the filename.
    if (dest.charAt(dest.length() - 1) != '/') {
      dest = dest + "/" + filename;
    } else {
      dest = dest + filename;
    }

    // System.out.println("Adding file to " + destination);

    // Check if the file already exists
    Path path = new Path(dest);
    if (fileSystem.exists(path)) {
      System.out.println("File " + dest + " already exists");
      return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(new File(
        source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    // Close all the file descriptors
    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * read a file from hdfs
   * @param file
   * @param conf
   * @throws IOException
   */
  public void readFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * delete a directory in hdfs
   * @param file
   * @throws IOException
   */
  public void deleteFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    fileSystem.delete(new Path(file), true);

    fileSystem.close();
  }

  /**
   * create directory in hdfs
   * @param dir
   * @throws IOException
   */
  public void mkdir(String dir, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
      System.out.println("Dir " + dir + " already not exists");
      return;
    }

    fileSystem.mkdirs(path);

    fileSystem.close();
  }

  public static void main(String[] args) throws IOException {

    if (args.length < 1) {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    FileSystemOperations client = new FileSystemOperations();
    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

    Configuration conf = new Configuration();
    // Providing conf files
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
    // (or) using relative paths
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

    //(or)
    // alternatively provide namenode host and port info
    conf.set("fs.default.name", hdfsPath);

    if (args[0].equals("add")) {
      if (args.length < 3) {
        System.out.println("Usage: hdfsclient add <local_path> "
            + "<hdfs_path>");
        System.exit(1);
      }

      client.addFile(args[1], args[2], conf);

    } else if (args[0].equals("read")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient read <hdfs_path>");
        System.exit(1);
      }

      client.readFile(args[1], conf);

    } else if (args[0].equals("delete")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient delete <hdfs_path>");
        System.exit(1);
      }

      client.deleteFile(args[1], conf);

    } else if (args[0].equals("mkdir")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
        System.exit(1);
      }

      client.mkdir(args[1], conf);

    } else {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    System.out.println("Done!");
  }
}

Github资源文件

Github地址:https://gist.github.com/ashrithr/f7899fdfd36ee800f151

本文转载自:http://blog.rajeevsharma.in/2009/06/using-hdfs-in-java-0200.html

共有 人打赏支持
boonya
粉丝 73
博文 214
码字总数 43922
作品 0
成都
高级程序员
使用 FileSystem JAVA API 对 HDFS 进行读、写、删除等操作

Hadoop文件系统 基本的文件系统命令操作, 通过hadoop fs -help可以获取所有的命令的详细帮助文件。 Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽...

大数据之路
2012/12/05
0
0
VirtualBox、CentOS 6.4、Hadoop、Hive玩起

1 安装VirtualBox: VirtualBox是一款开源免费的并且非常强大的虚拟机软件,同时支持X86和AMD64/Intel64,可以在多个操作系统平台上运行。与同性质的VMWare和Virtual PC比较,VirtualBox独到...

smile_zjw
2013/11/26
0
1
hadoop程序员用自己5年的实战经验告诉我大数据是这样搭建环境

Hadoop由GNU / Linux平台及其版本支持。因此,我们必须安装一个Linux操作系统来设置Hadoop环境。如果您有 除Linux以外的操作系统,您可以在其中安装Virtualbox软件,并在Virtualbox内部安装L...

卢家大少
06/22
0
0
Hadoop入门-单机伪分布式配置

为了配置的的方便建议先提升你的登录用户权限,本配置中凡是要在终端中输入的命令都用红色字体,需注意对应的目录用了色字体标出。 启用超级用户sudo passwd root(然后输入你的超级用户密码...

龙虎组
2012/08/25
0
0
Hadoop入门进阶步步高(四)-测试Hadoop

四、测试Hadoop 一个简单的求每年温度最大值的程序。 1、准备两个文本测试数据 准备两个名为data1.txt及data2.txt的文件,用于做为计算的输入数据,将其放于/home/fenglibin/java/data目录下...

帅锅锅
2015/08/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MySQL autocommit探究

-- sessionA:tx_isolation=REPEATABLE-READmysql> select connection_id();+-----------------+| connection_id() |+-----------------+| 28 |+-----------------+......

安小乐
8分钟前
4
0
c++多线程锁 Mutex  自动判断死锁

c++多线程锁可以使用absl::Mutex std::mutex这两种,下面是demo代码。 使用absl:Mutex的时候打印: [mutex.cc : 1338] RAW: Cycle: [mutex.cc : 1352] RAW: mutex@0x683b68 stack: @ 0x43856......

青黑
26分钟前
1
0
Blockathon2018(成都站)比赛落幕,留给我们这些区块链应用思考

9月14日,HiBlock区块链社区主办的第二届Blockathon在成都菁融国际广场成功举行,30名参赛者分为5支队伍在48小时内完成区块链项目的创意、开发及路演,经过紧张的开发及现场评选,最终币托(...

HiBlock
31分钟前
0
0
71.告警系统主脚本 配置文件 监控项目

20.20 告警系统主脚本(main.sh) 20.21 告警系统配置文件 20.22 告警系统监控项目 20.20 告警系统主脚本(main.sh): ~1.约定:把以后所有的shell脚本放在/usr/local/sbin下,也方便我们查...

王鑫linux
38分钟前
0
0
装饰者模式

装饰者模式 Q:何为装饰模式? ()地给一个对象添加一些额外的(),并且()时,并不影响原对象。扩展功能来说,装饰器模式相比生成子类更为灵活。 Q:使用场景? 1.想要在不影响其他对象的情况下...

阿元
59分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部