文档章节

Hadoop LineRecordReader实现分析

吐槽的达达仔
 吐槽的达达仔
发布于 2014/12/05 22:17
字数 622
阅读 30
收藏 0

直接上代码:

package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
 * Treats keys as offset in file and value as line. 
 */
public class LineRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(LineRecordReader.class);

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;
  private Seekable filePosition;
  private CompressionCodec codec; //
  private Decompressor decompressor;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
	//根据job的配置信息,和split的信息,获取到读取实体文件的信息,这里包括文件的压缩信息。
	//这里压缩的code有:DEFAULT,GZIP,BZIP2,LZO,LZ4,SNAPPY
    codec = compressionCodecs.getCodec(file); 

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

    if (isCompressedInput()) {
	//通过CodecPool的getCompressor方法获得Compressor对象,该方法需要传入一个codec,
	//然后Compressor对象在createOutputStream中使用,使用完毕后再通过returnCompressor放回去
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new LineReader(cIn, job);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        in = new LineReader(codec.createInputStream(fileIn, decompressor),
            job);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new LineReader(fileIn, job);
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
  
  private boolean isCompressedInput() {
    return (codec != null);
  }

  private int maxBytesToConsume(long pos) {
    return isCompressedInput()
      ? Integer.MAX_VALUE
      : (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal;
    if (isCompressedInput() && null != filePosition) {
      retVal = filePosition.getPos();
    } else {
      retVal = pos;
    }
    return retVal;
  }

//读取每一行数据的时候,都会执行nextKeyValue()方法。
//返回为true的时候,就会再调用getCurrentKey和getCurrentValue方法获取,key,value值
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
	//在这里进行数据读取,LineReader以\n作为分隔符,读取一行数据,放到Text value里面
     //读取一行,可以参考LineReader的源码实现
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {
      if (decompressor != null) {
        CodecPool.returnDecompressor(decompressor);
      }
    }
  }
}

 

ref:http://blog.csdn.net/lastsweetop/article/details/9173061

http://www.myexception.cn/program/1345730.html

 

本文转载自:http://dacoolbaby.iteye.com/blog/2018585

吐槽的达达仔
粉丝 28
博文 104
码字总数 7105
作品 0
广州
程序员
私信 提问
Hadoop源代码分析(包mapreduce.lib.input)

接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下...

超人学院
2015/05/22
94
0
Hadoop MapReduce中如何处理跨行Block和UnputSplit

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会...

强子哥哥
2015/11/12
176
0
自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduc...

大数据之路
2013/05/26
12.5K
16
Spark应用开发之一:Hadoop分析大数据

要学会和使用一门技术的时候,首先要弄清楚该技术出现的背景和要解决的问题。要说spark首先要了解海量数据的处理和Hadoop技术。 一个系统在运行的过程中都会产生许多的日志数据,这些日志数据...

mybabe0312
2017/04/20
0
0
Hadoop MapReduce中如何处理跨行Block和InputSplit

Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中? 2. 在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,...

Hello陈翔
2013/01/03
4.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot2.0 maven打包分离lib,resources

springboot将工程打包成jar包后,会出现获取classpath下的文件出现测试环境正常而生产环境文件找不到的问题,这是因为 1、在调试过程中,文件是真实存在于磁盘的某个目录。此时通过获取文件路...

陈俊凯
今天
6
0
BootStrap

一、BootStrap 简洁、直观、强悍的前端开发框架,让web开发更加迅速、简单 中文镜像网站:http://www.bootcss.com 用于开发响应式布局、移动设备优先的WEB项目 1、使用boot 创建文件夹,在文...

wytao1995
今天
10
0
小知识:讲述Linux命令别名与资源文件的区别

别名 别名是命令的快捷方式。为那些需要经常执行,但需要很长时间输入的长命令创建快捷方式很有用。语法是: alias ppp='ping www.baidu.com' 它们并不总是用来缩短长命令。重要的是,你将它...

老孟的Linux私房菜
今天
8
0
《JAVA核心知识》学习笔记(6. Spring 原理)-5

它是一个全面的、企业应用开发一站式的解决方案,贯穿表现层、业务层、持久层。但是 Spring 仍然可以和其他的框架无缝整合。 6.1.1. Spring 特点 6.1.1.1. 轻量级 6.1.1.2. 控制反转 6.1.1....

Shingfi
今天
8
0
Excel导入数据库数据+Excel导入网页数据【实时追踪】

1.Excel导入数据库数据:数据选项卡------>导入数据 2.Excel导入网页数据【实时追踪】:

东方墨天
今天
11
1

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部