文档章节

Hadoop的整文件读取

KevinWen
 KevinWen
发布于 2014/05/01 13:26
字数 727
阅读 469
收藏 0

    写Hadoop程序时,有时候需要读取整个文件,而不是分片读取,但默认的为分片读取,所以,只有编写自己的整文件读取类。

需要编写的有:

    WholeInputFormat类,继承自FileInputFormat类

    WholeRecordReader类,继承自RecordReader类

    其中,用于读取的类是WholeRecordReader类。以下代码以Text为key值类型,BytesWritable为value的类型,因为大多数格式在hadoop中都没有相应的类型支持,比如jpg,sdf,png等等,在hadoop中都没有相应的类,但是都可以转换为byte[]字节流,然后在转化为BytesWritable类型,最后在Map或者Reduce再转换成java中的相应类型。

    代码如下,解释见 :

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeInputFormat extends FileInputFormat<Text, BytesWritable>
{
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader
(InputSplit split, TaskAttemptContext context) 
     throws IOException,InterruptedException 
     {
        return new WholeRecordReader();
     }

    @Override
    //判断是否分片,false表示不分片,true表示分片。 
    //其实这个不写也可以,因为在WholeRecordReader中一次性全部读完
     protected boolean isSplitable(JobContext context,Path file)
     {
         return false;
     }
}

    下面是WholeRecordReader类:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<Text,BytesWritable>
{
     //Hadoop中处理文件的类
     private FileSplit fileSplit;
     private FSDataInputStream in = null;
 
     private BytesWritable value = null;
     private Text key = null;
     
     //用于判断文件是否读取完成
     //也就是因为这个,所以WholeInputFormat中的isSplitable方法可以不用写
     private boolean processed = false;
 
     @Override
     public void close() throws IOException 
     {
        //do nothing
     }

     @Override
     public Text getCurrentKey() throws IOException, InterruptedException 
     {
          return this.key;
     }
 
     @Override
     public BytesWritable getCurrentValue() throws IOException,InterruptedException 
     {
          return this.value;
     }
 
     @Override
     public float getProgress() throws IOException, InterruptedException 
     {
          return processed ? fileSplit.getLength() : 0;
     }
  
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException 
     {
          //打开一个文件输入流
          fileSplit = (FileSplit)split;
          Configuration job = context.getConfiguration();
          Path file = fileSplit.getPath();
          FileSystem temp = file.getFileSystem(job);
          in = temp.open(file);
     }

     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException
     {
          if(key == null)
          {
              key = new Text();
          }
  
          if(value == null)
          {
              value = new BytesWritable();
          }
  
          if(!processed)
          {
              //申请一个字节数组保存将从文件中读取的内容
              byte[] content = new byte[(int)fileSplit.getLength()];
              Path file = fileSplit.getPath();
              //以文件的名字作为传递给Map函数的key值,可以自行设置
              key.set(file.getName());
        
              try{
               //读取文件中的内容
               IOUtils.readFully(in,content,0,content.length);
               //将value的值设置为byte[]中的值
               value.set(new BytesWritable(content));
              }catch(IOException e)
              {
                   e.printStackTrace();
              }finally{
               //关闭输入流
               IOUtils.closeStream(in);
              }
        
              //将processed设置成true,表示读取文件完成,以后不再读取
              processed = true;
              return true;
          }
         
         return false;
     }
}

    当把这些写好后,在main()函数或者run()函数里面将job的输入格式设置成WholeInputFormat,如下:

job.setInputFormatClass(WholeInputFormat.class);

    现在,可以整个文件读取了。其中,key,value的类型可以换成大家需要的类型。不过,当在Hadoop中找不到对应类型的时候建议用BytesWritable类型,然后用byte[]作为中间类型转化为java可以处理的类型。

© 著作权归作者所有

KevinWen
粉丝 1
博文 9
码字总数 6153
作品 0
泸州
私信 提问
(第3篇)HDFS是什么?HDFS适合做什么?我们应该怎样操作HDFS系统?

HDFS文件系统 Hadoop 附带了一个名为 HDFS(Hadoop分布式文件系统)的分布式文件系统,专门存储超大数据文件,为整个Hadoop生态圈提供了基础的存储服务。 本章内容: 1) HDFS文件系统的特点,以...

I加加
2017/03/06
0
0
Hadoop大数据入门到实战(第四节) - HDFS文件系统(使用)

这一小节我们来学习:1.HDFS的设计,2.HDFS常用命令。 HDFS的设计 分布式文件系统 客户:帮我保存一下这几天的数据。 程序猿:好嘞,有多大呢? 客户:1T。 程序猿:好没问题,买个硬盘就搞定...

MasterXiao
2018/08/09
0
0
Hadoop: Why Not Use RAID?

一、针对hadoop集群的磁盘配置建议 针对datanode,建议采用一组单独的磁盘,针对namenode节点,建议采用raid5或raid1来实现针对metadata的冗灾。 二、针对此问题讨论的资料 针对此问题,两篇文...

cloud-coder
2014/03/22
1.4K
0
海量日志实时收集系统架构设计与go语言实现

日志收集系统应该说是到达一定规模的公司的标配了,一个能满足业务需求、运维成本低、稳定的日志收集系统对于运维的同学和日志使用方的同学都是非常nice的。然而这时理想中的日志收集系统,现...

ZingpLiu
2018/08/23
0
0
Centos6.5集群安装64位hadoop2.2.0

Centos6.5集群安装64位hadoop2.2.0: 网络环境,整3台虚拟机或者物理机,配置好相应网络,编辑/etc/hosts, 如: ssh无密码登录,很简单,每台机器都生成公钥,密钥(事先建立一个统一的hadoo...

ihaolin
2014/01/09
2.4K
2

没有更多内容

加载失败,请刷新页面

加载更多

toast组件单元测试

先看是否存在 describe('Toast', () => { it('存在.', () => { expect(Toast).to.be.exist }) }); 看属性,我们要测 ToastVue 和 plugin.js describe('Toast', () =>......

ories
22分钟前
57
0
如何将整个MySQL数据库字符集和排序规则转换为UTF-8?

如何将整个MySQL数据库字符集转换为UTF-8并将排序规则转换为UTF-8? #1楼 在命令行外壳上 如果您是命令行外壳程序之一,则可以非常快速地执行此操作。 只需填写“ dbname”:D DB="dbname"(...

javail
今天
80
0
开源矿工系统内部的层

开源矿工系统内部的层 所谓“层”、“界”、“域”、“集合”,这些词其实是在试图表达物质系统的组成结构和运动景象中的规矩,这些不同人发明的词都是来源于对同一个规律的观察、发现、表达...

NTMiner
今天
88
0
如何将文件从一个git repo移到另一个(不是克隆),保留历史记录

我们的Git储存库是作为单个Monster SVN储存库的一部分开始的,其中每个项目都有自己的树,如下所示: project1/branches /tags /trunkproject2/branches /tags ...

技术盛宴
今天
65
0
数据结构之数组-c代码实现

在上一篇文章里讲了数组的具体内容,然后自己使用c语言对数组进行了实现。 其中定义了一个结构体,定义了长度、已使用长度和地址指针。 定义alloc函数来分配内存空间 之后便是插入元素的ins...

无心的梦呓
今天
65
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部