文档章节

Hadoop MapReduce中如何处理跨行Block和UnputSplit

强子大叔的码田
 强子大叔的码田
发布于 2015/11/12 20:34
字数 1517
阅读 176
收藏 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?

     对于上面的两个问题,首先要明确两个概念:Block和InputSplit

      1. block是hdfs存储文件的单位(默认是64M);
      2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

[java] view plaincopy

  1. public List<InputSplit> getSplits(JobContext job) throws IOException {    

  2.   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    

  3.   long maxSize = getMaxSplitSize(job);    

  4.     

  5.   // generate splits    

  6.   List<InputSplit> splits = new ArrayList<InputSplit>();    

  7.   List<FileStatus> files = listStatus(job);          

  8.   for (FileStatus file: files) {    

  9.     Path path = file.getPath();    

  10.     long length = file.getLen();    

  11.     if (length != 0) {    

  12.       FileSystem fs = path.getFileSystem(job.getConfiguration());    

  13.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);    

  14.       if (isSplitable(job, path)) {    

  15.         long blockSize = file.getBlockSize();    

  16.         long splitSize = computeSplitSize(blockSize, minSize, maxSize);    

  17.     

  18.         long bytesRemaining = length;    

  19.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {    

  20.           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);    

  21.           splits.add(makeSplit(path, length-bytesRemaining, splitSize,    

  22.                                    blkLocations[blkIndex].getHosts()));    

  23.           bytesRemaining -= splitSize;    

  24.         }    

  25.     

  26.         if (bytesRemaining != 0) {    

  27.           splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,    

  28.                      blkLocations[blkLocations.length-1].getHosts()));    

  29.         }    

  30.       } else { // not splitable    

  31.         splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));    

  32.       }    

  33.     } else {     

  34.       //Create empty hosts array for zero length files    

  35.       splits.add(makeSplit(path, 0, length, new String[0]));    

  36.     }    

  37.   }    

  38.   // Save the number of input files for metrics/loadgen    

  39.   job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());    

  40.   LOG.debug("Total # of splits: " + splits.size());    

  41.   return splits;    

  42. }    

  从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize

对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

[java] view plaincopy

  1. while (getFilePosition() <= end) {    

  2.   newSize = in.readLine(value, maxLineLength,    

  3.       Math.max(maxBytesToConsume(pos), maxLineLength));    

  4.   if (newSize == 0) {    

  5.     break;    

  6.   }    

 其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

[java] view plaincopy

  1. public int readLine(Text str, int maxLineLength,    

  2.                     int maxBytesToConsume) throws IOException {    

  3.   if (this.recordDelimiterBytes != null) {    

  4.     return readCustomLine(str, maxLineLength, maxBytesToConsume);    

  5.   } else {    

  6.     return readDefaultLine(str, maxLineLength, maxBytesToConsume);    

  7.   }    

  8. }    

  9.     

  10. /**  

  11.  * Read a line terminated by one of CR, LF, or CRLF.  

  12.  */    

  13. private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)    

  14. throws IOException {    

  15.   str.clear();    

  16.   int txtLength = 0//tracks str.getLength(), as an optimization    

  17.   int newlineLength = 0//length of terminating newline    

  18.   boolean prevCharCR = false//true of prev char was CR    

  19.   long bytesConsumed = 0;    

  20.   do {    

  21.     int startPosn = bufferPosn; //starting from where we left off the last time    

  22.     if (bufferPosn >= bufferLength) {    

  23.       startPosn = bufferPosn = 0;    

  24.       if (prevCharCR)    

  25.         ++bytesConsumed; //account for CR from previous read    

  26.       bufferLength = in.read(buffer);    

  27.       if (bufferLength <= 0)    

  28.         break// EOF    

  29.     }    

  30.     for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline    

  31.       if (buffer[bufferPosn] == LF) {    

  32.         newlineLength = (prevCharCR) ? 2 : 1;    

  33.         ++bufferPosn; // at next invocation proceed from following byte    

  34.         break;    

  35.       }    

  36.       if (prevCharCR) { //CR + notLF, we are at notLF    

  37.         newlineLength = 1;    

  38.         break;    

  39.       }    

  40.       prevCharCR = (buffer[bufferPosn] == CR);    

  41.     }    

  42.     int readLength = bufferPosn - startPosn;    

  43.     if (prevCharCR && newlineLength == 0)    

  44.       --readLength; //CR at the end of the buffer    

  45.     bytesConsumed += readLength;    

  46.     int appendLength = readLength - newlineLength;    

  47.     if (appendLength > maxLineLength - txtLength) {    

  48.       appendLength = maxLineLength - txtLength;    

  49.     }    

  50.     if (appendLength > 0) {    

  51.       str.append(buffer, startPosn, appendLength);    

  52.       txtLength += appendLength;    

  53.     }    

  54.   } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);   <span style="color: #ff0000;">//①</span>    

  55.     

  56.   if (bytesConsumed > (long)Integer.MAX_VALUE)    

  57.     throw new IOException("Too many bytes before newline: " + bytesConsumed);        

  58.   return (int)bytesConsumed;    

  59. }    

  我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:

while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:

1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?

2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?

为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

[java] view plaincopy

  1. public boolean nextKeyValue() throws IOException {    

  2.   if (key == null) {    

  3.     key = new LongWritable();    

  4.   }    

  5.   key.set(pos);    

  6.   if (value == null) {    

  7.     value = new Text();    

  8.   }    

  9.   int newSize = 0;    

  10.   // We always read one extra line, which lies outside the upper    

  11.   // split limit i.e. (end - 1)    

  12.   while (getFilePosition() <= end) {        <span style="color: #ff0000;"//②</span>    

  13.     newSize = in.readLine(value, maxLineLength,    

  14.         Math.max(maxBytesToConsume(pos), maxLineLength));    

  15.     if (newSize == 0) {    

  16.       break;    

  17.     }    

  18.     pos += newSize;    

  19.     inputByteCounter.increment(newSize);    

  20.     if (newSize < maxLineLength) {    

  21.       break;    

  22.     }    

  23.     

  24.     // line too long. try again    

  25.     LOG.info("Skipped line of size " + newSize + " at pos " +     

  26.              (pos - newSize));    

  27.   }    

  28.   if (newSize == 0) {    

  29.     key = null;    

  30.     value = null;    

  31.     return false;    

  32.   } else {    

  33.     return true;    

  34.   }    

  35. }    

    通过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。

     再来看LineRecordReader的initialize方法:

[java] view plaincopy

  1. // If this is not the first split, we always throw away first record    

  2. // because we always (except the last split) read one extra line in    

  3. // next() method.    

  4. if (start != 0) {    

  5.   start += in.readLine(new Text(), 0, maxBytesToConsume(start));    

  6. }    

  7. this.pos = start;    

    如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。


本文转载自:http://blog.csdn.net/wanghai__/article/details/6583364

强子大叔的码田

强子大叔的码田

粉丝 917
博文 1444
码字总数 1227067
作品 9
南京
架构师
私信 提问
一文详解大规模数据计算处理原理及操作重点

作者介绍 李智慧,《大型网站技术架构:核心原理与案例分析》作者。曾供职于阿里巴巴与英特尔亚太研发中心,从事大型网站与大数据方面的研发工作,目前在做企业级区块链方面的开发工作。 大数...

DBAplus社群
2018/08/07
0
0
大数据(hadoop-Mapreduce原理架构)

课程目标: 1:MapReduce的应用场景 2:MapReduce编程模型 3:MapReduce的架构 4:常见MapReduce应用场景 5:总结 MapReduce的定义 源自于Google的MapReduce论文 发表于2004年12月 Hadoop M...

这很耳东先生
04/30
52
0
Storm与Spark、Hadoop框架对比

Storm与Spark、Hadoop三种框架对比 Storm与Spark、Hadoop这三种框架,各有各的优点,每个框架都有自己的最佳应用场景。所以,在不同的应用场景下,应该选择不同的框架。 1.Storm是最佳的流式...

boonya
04/19
14
0
Hadoop MapReduce计算框架

1、MapReduce理论 1.1、MapReduce是什么? MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据);MapReduce是谷歌公...

巴利奇
2018/10/30
0
0
Hadoop 之初识Hadoop

Hadoop历史 雏形开始于2002年的Apache的Nutch,Nutch是一个开源Java 实现的搜索引擎。它提供了我们运行自己的搜索引擎所需的全部工具。包括全文搜索和Web爬虫。 随后在2003年Google发表了一篇...

李超
2015/04/02
318
0

没有更多内容

加载失败,请刷新页面

加载更多

如何更改iOS应用程序的名称?

我前几天用一个愚蠢的开发代码名称开始了一个iPhone项目,现在我想改变项目的名称,因为它已经接近完成了。 但是我不知道如何使用Xcode来做这件事,尝试在info.plist文件中更改应用程序的名称...

技术盛宴
24分钟前
3
0
关于win10tensorflow的配置(CPU+GPU)

主要内容 CPU篇 GPU篇 【前期准备与注意事项】 环境:window1064位+python36(37)+CUDA9.0+cuDNN7.6+tensorflow_gpu-1.12.0 软件:anaconda+pycharm 硬件:有nvidia显卡的笔记本或台式(非A...

放只虎归个山
33分钟前
2
0
C#中的多行字符串文字

有没有一种简单的方法可以在C#中创建多行字符串文字? 这是我现在所拥有的: string query = "SELECT foo, bar"+ " FROM table"+ " WHERE id = 42"; 我知道PHP有 <<<BLOCKBLOCK; C#是......

javail
40分钟前
4
0
微信支付之小微商户扫盲!支持信用卡,免营业执照!

微信支付商户申请面向线下小微商户开放,符合条件的微信支付服务商可为小微商户发起接入申请。无需营业执照。 小微商户日收款额度为5万元~30万元 。 信用卡支付日限额为1千;月限额1万。 结算...

吴伟祥
今天
4
0
大话SDWebImage(三)-- 图片下载层

四、图片下载层 SDWebImageDownloader是处理图片下载的类 4.1 图片下载步骤 首先介绍下dispatch_barrier,GCD中的dispatch_barrier目的是在并发队列实现串行的效果,创建下载任务SDWebImageD...

aron1992
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部