文档章节

Hadoop MapReduce中如何处理跨行Block和InputSplit

Hello陈翔
 Hello陈翔
发布于 2013/01/03 11:51
字数 933
阅读 4867
收藏 15

1. Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?

2. 在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?

对于上面的两个问题,首先要明确两个概念:Block和InputSplit:

1. Block是HDFS存储文件的单位(默认是64M);

2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此以行记录形式的文本,可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

下面以hadoop-0.22.0源码进行分析

org.apache.hadoop.mapred.FileInputFormat:

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      if ((length != 0) && isSplitable(fs, path)) { 
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          String[] splitHosts = getSplitHosts(blkLocations, 
              length-bytesRemaining, splitSize, clusterMap);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
                               splitHosts));
          bytesRemaining -= splitSize;
        }
        
        if (bytesRemaining != 0) {
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(makeSplit(path, 0, length, splitHosts));
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分,splitSize = computeSplitSize(goalSize, minSize, blockSize);goalSize,minSize,blockSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,可能被切分到不同的InputSplit。 但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成

org.apache.hadoop.mapred.TextInputFormat:

public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {
    
    reporter.setStatus(genericSplit.toString());
    return new LineRecordReader(job, (FileSplit) genericSplit);
  }
org.apache.hadoop.mapred.LineRecordReader :

/** Read a line. */
  public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      key.set(pos);

      int newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        return false;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
  }
对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取


如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如何做到不读取L这条记录在B中的部分呢?

org.apache.hadoop.mapred.LineRecordReader:

// If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
如果不是first split,则会丢弃第一个record,避免了重复读取的问题。




© 著作权归作者所有

Hello陈翔
粉丝 6
博文 18
码字总数 7743
作品 0
海淀
程序员
私信 提问
【Hadoop】- MapReduce 框架详细介绍

MapReduce 简介 说明: 通过由普通机器组成的集群对大量数据集进行并行处理可依靠的容错软件框架。 MapReduce作业可以将数据集分割为Map任务并行处理的数据块,框架对对Map过程产生的数据进行...

ZeroneLove
02/24
0
0
通过inputSplit分片size控制map数目

前言:在具体执行Hadoop程序的时候,我们要根据不同的情况来设置Map的个数。除了设置固定的每个节点上可运行的最大map个数外,我们还需要控制真正执行Map操作的任务个数。 1.如何控制实际运行...

仙剑奇侠
2014/12/05
0
0
MapReduce 实验 (一) 原理

官网 http://hadoop.apache.org/ hadoop三大组件 HDFS:分布式存储系统 https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html MapReduce:分布式计算......

pcdog
2018/04/15
0
0
一文详解大规模数据计算处理原理及操作重点

作者介绍 李智慧,《大型网站技术架构:核心原理与案例分析》作者。曾供职于阿里巴巴与英特尔亚太研发中心,从事大型网站与大数据方面的研发工作,目前在做企业级区块链方面的开发工作。 大数...

DBAplus社群
2018/08/07
0
0
大数据(hadoop-Mapreduce原理架构)

课程目标: 1:MapReduce的应用场景 2:MapReduce编程模型 3:MapReduce的架构 4:常见MapReduce应用场景 5:总结 MapReduce的定义 源自于Google的MapReduce论文 发表于2004年12月 Hadoop M...

这很耳东先生
04/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Linux下的抓包 tcpdump

公司Linux下socket对接时发现接口中总是报错,所以研究了一下Linux下的抓包 tcpdump host #IP地址# //获取主机210.27.48.1接收或发出的telnet包 tcpdump参数的主要选项 -i :指定网卡 默认是...

Aeroever
19分钟前
1
0
服务器端 跨平台

https://stackoverflow.com/questions/41437503/postgresql-9-5-or-9-6-for-s390x ##cloud-config#password: 123#chpasswd: { expire: False }#ssh_pwauth: True#ssh_import_id: kirkl......

MtrS
20分钟前
1
0
Date format日期时间转换标准的前世今生

Why 为什么一个看上去简简单单的时间表示在写程序时候会如此复杂?其实时间记录充满了地域性,时间,重量,长度等度量往往在统计方式与单位上都有许多差异。 比如中国自古以月阴晴为统计周期...

屌丝Lee
29分钟前
2
0
壮丽70年·奋斗新时代|蒸妙熏蒸说:中医药在世界上是独树一帜

世界上任何一门技术,通过大学几年专业的学习,基本上都能够掌握,并应用于实践。唯有中医,大学、硕士、博士连续十几年学习,出来却连最简单的疾病都不会治,更有甚者,很多人还成了中医的掘...

公益传承
30分钟前
2
0
IoT SaaS加速器——助力阿尔茨海默病人护理

场景介绍 阿尔茨海默病,是导致中老年人认知功能障碍的最常见疾病之一,是发生在老年期及老年前期的一种原发性退行性脑病。据估计,全世界痴呆症患者数量为4700万,到2030年将达到7500万人。...

zhaowei121
34分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部