文档章节

Hadoop源码解析之: TextInputFormat如何处理跨split的行

猪刚烈
 猪刚烈
发布于 2014/10/12 11:40
字数 1997
阅读 15
收藏 0
点赞 0
评论 0

我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:

  •  对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
  •  针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。

最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):


1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:

  • 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
  • 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
  • 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止

/**
	   * Read one line from the InputStream into the given Text.  A line
	   * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
	   * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
	   * line.
	   *
	   * @param str the object to store the given line (without newline)
	   * @param maxLineLength the maximum number of bytes to store into str;
	   *  the rest of the line is silently discarded.
	   * @param maxBytesToConsume the maximum number of bytes to consume
	   *  in this call.  This is only a hint, because if the line cross
	   *  this threshold, we allow it to happen.  It can overshoot
	   *  potentially by as much as one buffer length.
	   *
	   * @return the number of bytes read including the (longest) newline
	   * found.
	   *
	   * @throws IOException if the underlying stream throws
	   */
	  public int readLine(Text str, int maxLineLength,
	                      int maxBytesToConsume) throws IOException {
	    /* We're reading data from in, but the head of the stream may be
	     * already buffered in buffer, so we have several cases:
	     * 1. No newline characters are in the buffer, so we need to copy
	     *    everything and read another buffer from the stream.
	     * 2. An unambiguously terminated line is in buffer, so we just
	     *    copy to str.
	     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
	     *    in CR.  In this case we copy everything up to CR to str, but
	     *    we also need to see what follows CR: if it's LF, then we
	     *    need consume LF as well, so next call to readLine will read
	     *    from after that.
	     * We use a flag prevCharCR to signal if previous character was CR
	     * and, if it happens to be at the end of the buffer, delay
	     * consuming it until we have a chance to look at the char that
	     * follows.
	     */
	    str.clear();
	    int txtLength = 0; //tracks str.getLength(), as an optimization
	    int newlineLength = 0; //length of terminating newline
	    boolean prevCharCR = false; //true of prev char was CR
	    long bytesConsumed = 0;
	    do {
	      int startPosn = bufferPosn; //starting from where we left off the last time
		  //如果buffer中的数据读完了,先加载一批数据到buffer里
	      if (bufferPosn >= bufferLength) {
	        startPosn = bufferPosn = 0;
	        if (prevCharCR)
	          ++bytesConsumed; //account for CR from previous read
	        bufferLength = in.read(buffer);
	        if (bufferLength <= 0)
	          break; // EOF
	      }
		  //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:
		  //UNIX: '\n'  (LF)
		  //Mac:  '\r'  (CR)
		  //Windows: '\r\n'  (CR)(LF)
		  //为了准确判断一行的结尾,程序的判定逻辑是:
		  //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个
		  //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”
		  //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。
		  //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。
		  //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。
	      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
	        if (buffer[bufferPosn] == LF) {
	          newlineLength = (prevCharCR) ? 2 : 1;
	          ++bufferPosn; // at next invocation proceed from following byte
	          break;
	        }
	        if (prevCharCR) { //CR + notLF, we are at notLF
	          newlineLength = 1;
	          break;
	        }
	        prevCharCR = (buffer[bufferPosn] == CR);
	      }
	      int readLength = bufferPosn - startPosn;
	      if (prevCharCR && newlineLength == 0)
	        --readLength; //CR at the end of the buffer
	      bytesConsumed += readLength;
	      int appendLength = readLength - newlineLength;
	      if (appendLength > maxLineLength - txtLength) {
	        appendLength = maxLineLength - txtLength;
	      }
	      if (appendLength > 0) {
	        str.append(buffer, startPosn, appendLength);
	        txtLength += appendLength;		 
	      }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。
	      //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)
	      //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:
	      //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。
	      //所以不会出现“断行”的问题!
	    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
	
	    if (bytesConsumed > (long)Integer.MAX_VALUE)
	      throw new IOException("Too many bytes before newline: " + bytesConsumed);    
	    return (int)bytesConsumed;
	  }

2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:

在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!

// If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }

相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于 或等于split的结尾位置,也就说 :当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!

// We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
	...

小结:

至此,跨split的行读取的逻辑就完备了. 如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.



本文转载自:http://blog.csdn.net/bluishglc/article/details/9380087

共有 人打赏支持
猪刚烈
粉丝 22
博文 708
码字总数 110
作品 1
海淀
程序员
Hadoop MapReduce中如何处理跨行Block和InputSplit

Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中? 2. 在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,...

Hello陈翔 ⋅ 2013/01/03 ⋅ 0

InputFormat加载数据

InputFormat是一个抽象类,其定义如下: InputFormat会对数据进行两方面的处理: 对输入数据进行逻辑切分,形成一个个split 针对每个split,新建一个RecorReader读取split里面的数据,形成一...

Jason_typ ⋅ 06/13 ⋅ 0

通过inputSplit分片size控制map数目

前言:在具体执行Hadoop程序的时候,我们要根据不同的情况来设置Map的个数。除了设置固定的每个节点上可运行的最大map个数外,我们还需要控制真正执行Map操作的任务个数。 1.如何控制实际运行...

仙剑奇侠 ⋅ 2014/12/05 ⋅ 0

Hadoop输入和输出的处理类(7)

hadoop输入的处理类 InputFormat InputFormat负责处理MR的输入部分。 作用: 1、验证作业的输入是否规范。 2、把输入文件切分成InputSplit。 3、提供RecordReader的实现类,把InputSplit读到...

肖鋭 ⋅ 2014/03/01 ⋅ 0

Hadoop实战读书笔记(7)

输入数据概要 输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块可以在多台计算机上并行处理,在Hadoop的术语中这些...

祥林会跟你远走高飞 ⋅ 2014/12/08 ⋅ 0

Hadoop MapReduce中如何处理跨行Block和UnputSplit

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会...

强子哥哥 ⋅ 2015/11/12 ⋅ 0

自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduc...

xrzs ⋅ 2013/05/26 ⋅ 16

MapReducer之Mapper中的Split切片原理(即影响MapTask数目的原因)

今天看到有朋友问到了MapTask的相关问题,我觉得有必要发个博客结合源码整个解析一下. 一、首先,我们看到Map运行的时候不同文件启动了不同数量的map任务,但是JOB中又没有设置map数量的配置,...

ProgrammerJiang ⋅ 2016/03/18 ⋅ 2

Hadoop源码篇---解读Mapprer源码Input输入

一。前述 上次分析了客户端源码,这次分析mapper源码让大家对hadoop框架有更清晰的认识 二。代码 自定义代码如下: public class MyMapper extends Mapper{ 继承Mapper源码如下: public cla...

LHWorldBlog ⋅ 01/09 ⋅ 0

MapReduce编程二

(1) InputFormat接口 用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法 public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws......

张欢19933 ⋅ 2016/03/30 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

个人博客的运营模式能否学习TMALL天猫质量为上?

心情随笔|个人博客的运营模式能否学习TMALL天猫质量为上? 中国的互联网已经发展了很多年了,记得在十年前,个人博客十分流行,大量的人都在写博客,而且质量还不错,很多高质量的文章都是在...

原创小博客 ⋅ 44分钟前 ⋅ 0

JavaScript零基础入门——(十一)JavaScript的DOM操作

JavaScript零基础入门——(十一)JavaScript的DOM操作 大家好,欢迎回到我们的JavaScript零基础入门。最近有些同学问我说,我讲的的比书上的精简不少。其实呢,我主要讲的是我在开发中经常会...

JandenMa ⋅ 今天 ⋅ 0

volatile和synchronized的区别

volatile和synchronized的区别 在讲这个之前需要先了解下JMM(Java memory Model :java内存模型):并发过程中如何处理可见性、原子性、有序性的问题--建立JMM模型 详情请看:https://baike.b...

MarinJ_Shao ⋅ 今天 ⋅ 0

深入分析Kubernetes Critical Pod(一)

Author: xidianwangtao@gmail.com 摘要:大家在部署Kubernetes集群AddOn组件的时候,经常会看到Annotation scheduler.alpha.kubernetes.io/critical-pod"="",以表示这是一个关键服务,那你知...

WaltonWang ⋅ 今天 ⋅ 0

原子性 - synchronized关键词

原子性概念 原子性提供了程序的互斥操作,同一时刻只能有一个线程能对某块代码进行操作。 原子性的实现方式 在jdk中,原子性的实现方式主要分为: synchronized:关键词,它依赖于JVM,保证了同...

dotleo ⋅ 今天 ⋅ 0

【2018.06.22学习笔记】【linux高级知识 14.4-15.3】

14.4 exportfs命令 14.5 NFS客户端问题 15.1 FTP介绍 15.2/15.3 使用vsftpd搭建ftp

lgsxp ⋅ 今天 ⋅ 0

JeeSite 4.0 功能权限管理基础(Shiro)

Shiro是Apache的一个开源框架,是一个权限管理的框架,实现用户认证、用户授权等。 只要有用户参与一般都要有权限管理,权限管理实现对用户访问系统的控制,按照安全规则或者安全策略控制用户...

ThinkGem ⋅ 昨天 ⋅ 0

python f-string 字符串格式化

主要内容 从Python 3.6开始,f-string是格式化字符串的一种很好的新方法。与其他格式化方式相比,它们不仅更易读,更简洁,不易出错,而且速度更快! 在本文的最后,您将了解如何以及为什么今...

阿豪boy ⋅ 昨天 ⋅ 0

Python实现自动登录站点

如果我们想要实现自动登录,那么我们就需要能够驱动浏览器(比如谷歌浏览器)来实现操作,ChromeDriver 刚好能够帮助我们这一点(非谷歌浏览器的驱动有所不同)。 一、确认软件版本 首先我们...

blackfoxya ⋅ 昨天 ⋅ 0

线性回归原理和实现基本认识

一:介绍 定义:线性回归在假设特证满足线性关系,根据给定的训练数据训练一个模型,并用此模型进行预测。为了了解这个定义,我们先举个简单的例子;我们假设一个线性方程 Y=2x+1, x变量为商...

wangxuwei ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部