文档章节

在java中使用HDFS(0.20.0)

boonya
 boonya
发布于 2017/02/14 15:20
字数 1435
阅读 20
收藏 0

转载文章示例

下面是如何在Java中读取和写入HDFS的代码示例。

1.创建配置对象:要能够读取或写入HDFS,需要创建一个Configuration对象,并使用hadoop配置文件将配置参数传递给它。

// Conf object will read the HDFS configuration parameters from these XML
    // files. You may specify the parameters for your own if you want.

    Configuration conf = new Configuration();
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

如果不将配置分配给conf对象(使用hadoop xml文件),则HDFS操作将在本地文件系统上执行,而不是在HDFS上执行。

2.将文件添加到HDFS:创建FileSystem对象并使用文件流添加文件。

FileSystem fileSystem = FileSystem.get(conf);
    
    // Check if the file already exists
    Path path = new Path("/path/to/file.ext");
    if (fileSystem.exists(path)) {
        System.out.println("File " + dest + " already exists");
        return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(
        new File(source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    // Close all the file descripters
    in.close();
    out.close();
    fileSystem.close();

3.从HDFS读取文件:在HDFS中创建一个文件流对象并读取它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();

4.从HDFS中删除文件:在HDFS中创建一个文件流对象并删除它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    // Delete file
    fileSystem.delete(new Path(file), true);

    fileSystem.close();

5.在HDFS中创建dir:在HDFS中创建一个文件流对象并读取它。

FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
        System.out.println("Dir " + dir + " already not exists");
        return;
    }

    // Create directories
    fileSystem.mkdirs(path);

    fileSystem.close();

全部代码:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSClient {
    public HDFSClient() {

    }

    public void addFile(String source, String dest) throws IOException {
        Configuration conf = new Configuration();

        // Conf object will read the HDFS configuration parameters from these
        // XML files.
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        // Get the filename out of the file path
        String filename = source.substring(source.lastIndexOf('/') + 1,
            source.length());

        // Create the destination path including the filename.
        if (dest.charAt(dest.length() - 1) != '/') {
            dest = dest + "/" + filename;
        } else {
            dest = dest + filename;
        }

        // System.out.println("Adding file to " + destination);

        // Check if the file already exists
        Path path = new Path(dest);
        if (fileSystem.exists(path)) {
            System.out.println("File " + dest + " already exists");
            return;
        }

        // Create a new file and write data to it.
        FSDataOutputStream out = fileSystem.create(path);
        InputStream in = new BufferedInputStream(new FileInputStream(
            new File(source)));

        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        // Close all the file descripters
        in.close();
        out.close();
        fileSystem.close();
    }

    public void readFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        FSDataInputStream in = fileSystem.open(path);

        String filename = file.substring(file.lastIndexOf('/') + 1,
            file.length());

        OutputStream out = new BufferedOutputStream(new FileOutputStream(
            new File(filename)));

        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        in.close();
        out.close();
        fileSystem.close();
    }

    public void deleteFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        fileSystem.delete(new Path(file), true);

        fileSystem.close();
    }

    public void mkdir(String dir) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(dir);
        if (fileSystem.exists(path)) {
            System.out.println("Dir " + dir + " already not exists");
            return;
        }

        fileSystem.mkdirs(path);

        fileSystem.close();
    }

    public static void main(String[] args) throws IOException {

        if (args.length < 1) {
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);
        }

        HDFSClient client = new HDFSClient();
        if (args[0].equals("add")) {
            if (args.length < 3) {
                System.out.println("Usage: hdfsclient add <local_path> " + 
                "<hdfs_path>");
                System.exit(1);
            }

            client.addFile(args[1], args[2]);
        } else if (args[0].equals("read")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient read <hdfs_path>");
                System.exit(1);
            }

            client.readFile(args[1]);
        } else if (args[0].equals("delete")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient delete <hdfs_path>");
                System.exit(1);
            }

            client.deleteFile(args[1]);
        } else if (args[0].equals("mkdir")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
                System.exit(1);
            }

            client.mkdir(args[1]);
        } else {   
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);
        }

        System.out.println("Done!");
    }
}

Github操作示例

package com.cloudwick.mapreduce.FileSystemAPI;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


/**
 * Simple Driver to read/write to hdfs
 * @author ashrith
 *
 */
public class FileSystemOperations {
  public FileSystemOperations() {

  }

  /**
   * create a existing file from local filesystem to hdfs
   * @param source
   * @param dest
   * @param conf
   * @throws IOException
   */
  public void addFile(String source, String dest, Configuration conf) throws IOException {

    FileSystem fileSystem = FileSystem.get(conf);

    // Get the filename out of the file path
    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

    // Create the destination path including the filename.
    if (dest.charAt(dest.length() - 1) != '/') {
      dest = dest + "/" + filename;
    } else {
      dest = dest + filename;
    }

    // System.out.println("Adding file to " + destination);

    // Check if the file already exists
    Path path = new Path(dest);
    if (fileSystem.exists(path)) {
      System.out.println("File " + dest + " already exists");
      return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(new File(
        source)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    // Close all the file descriptors
    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * read a file from hdfs
   * @param file
   * @param conf
   * @throws IOException
   */
  public void readFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    FSDataInputStream in = fileSystem.open(path);

    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());

    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));

    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
      out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();
  }

  /**
   * delete a directory in hdfs
   * @param file
   * @throws IOException
   */
  public void deleteFile(String file, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(file);
    if (!fileSystem.exists(path)) {
      System.out.println("File " + file + " does not exists");
      return;
    }

    fileSystem.delete(new Path(file), true);

    fileSystem.close();
  }

  /**
   * create directory in hdfs
   * @param dir
   * @throws IOException
   */
  public void mkdir(String dir, Configuration conf) throws IOException {
    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
      System.out.println("Dir " + dir + " already not exists");
      return;
    }

    fileSystem.mkdirs(path);

    fileSystem.close();
  }

  public static void main(String[] args) throws IOException {

    if (args.length < 1) {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    FileSystemOperations client = new FileSystemOperations();
    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

    Configuration conf = new Configuration();
    // Providing conf files
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
    // (or) using relative paths
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
    //    conf.addResource(new Path(
    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

    //(or)
    // alternatively provide namenode host and port info
    conf.set("fs.default.name", hdfsPath);

    if (args[0].equals("add")) {
      if (args.length < 3) {
        System.out.println("Usage: hdfsclient add <local_path> "
            + "<hdfs_path>");
        System.exit(1);
      }

      client.addFile(args[1], args[2], conf);

    } else if (args[0].equals("read")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient read <hdfs_path>");
        System.exit(1);
      }

      client.readFile(args[1], conf);

    } else if (args[0].equals("delete")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient delete <hdfs_path>");
        System.exit(1);
      }

      client.deleteFile(args[1], conf);

    } else if (args[0].equals("mkdir")) {
      if (args.length < 2) {
        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
        System.exit(1);
      }

      client.mkdir(args[1], conf);

    } else {
      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
          + " [<local_path> <hdfs_path>]");
      System.exit(1);
    }

    System.out.println("Done!");
  }
}

Github资源文件

Github地址:https://gist.github.com/ashrithr/f7899fdfd36ee800f151

本文转载自:http://blog.rajeevsharma.in/2009/06/using-hdfs-in-java-0200.html

共有 人打赏支持
boonya
粉丝 73
博文 214
码字总数 43922
作品 0
成都
高级程序员
私信 提问
使用 FileSystem JAVA API 对 HDFS 进行读、写、删除等操作

Hadoop文件系统 基本的文件系统命令操作, 通过hadoop fs -help可以获取所有的命令的详细帮助文件。 Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽...

大数据之路
2012/12/05
0
0
VirtualBox、CentOS 6.4、Hadoop、Hive玩起

1 安装VirtualBox: VirtualBox是一款开源免费的并且非常强大的虚拟机软件,同时支持X86和AMD64/Intel64,可以在多个操作系统平台上运行。与同性质的VMWare和Virtual PC比较,VirtualBox独到...

smile_zjw
2013/11/26
0
1
Hadoop入门-单机伪分布式配置

为了配置的的方便建议先提升你的登录用户权限,本配置中凡是要在终端中输入的命令都用红色字体,需注意对应的目录用了色字体标出。 启用超级用户sudo passwd root(然后输入你的超级用户密码...

龙虎组
2012/08/25
0
0
Hadoop中其他知识(24)

Thrift: 因为Hadoop文件系统的接口是通过JAVAApi提供的,所以其他非Java应用程序访问Hadoop文件系统会比较麻烦。 Thriftfs定制功能模块中ThriftApi通过把hadoop文件系统包装成一个ApacheThr...

肖鋭
2014/03/13
0
0
[Spark]Spark RDD 指南一 引入Spark

2.3.0版本:Spark2.3.0 引入Spark 1. Java版 Spark 2.1.1适用于Java 7及更高版本。 如果您使用的是Java 8,则Spark支持使用lambda表达式来简洁地编写函数,否则可以使用org.apache.spark.ap...

sjf0115
2017/06/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java并发编程基础(三)

线程间通信 线程间通信称为进程内通信,多个线程实现互斥访问共享资源时会互相发送信号货这等待信号,比如线程等待数据到来的通知,线程收到变量改变的信号。 线程阻塞(同步)和非阻塞(异步)...

chendom
8分钟前
1
0
阿里重磅开源首款自研科学计算引擎Mars,揭秘超大规模科学计算

日前,阿里巴巴正式对外发布了分布式科学计算引擎 Mars 的开源代码地址,开发者们可以在pypi上自主下载安装,或在Github上获取源代码并参与开发。 此前,早在2018年9月的杭州云栖大会上,阿里...

阿里云官方博客
18分钟前
1
0
我是怎样和Linux系统结缘并通过红帽RHCE认证的

我高考完当时就是选择的计算机科学与技术专业,上大学以后联想到的和计算机相关的就只有写代码,开发,网站,网页设计,就没有其他的了,当时学习写代码也都是在Windows上,什么C#、C++之类的...

问题终结者
28分钟前
1
0
SSH之端口转发

第一部分 概述 当你在咖啡馆享受免费 WiFi 的时候,有没有想到可能有人正在窃取你的密码及隐私信息?当实验室的防火墙阻止了你的网络应用端口,是不是有苦难言?来看看 SSH 的端口转发功能带...

无语年华
32分钟前
1
0
我是怎样和Linux系统结缘并通过红帽RHCE认证的

我高考完当时就是选择的计算机科学与技术专业,上大学以后联想到的和计算机相关的就只有写代码,开发,网站,网页设计,就没有其他的了,当时学习写代码也都是在Windows上,什么C#、C++之类的...

linuxprobe16
50分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部