文档章节

TextInputFormat之getSplits函数分析

强子哥哥
 强子哥哥
发布于 2014/12/31 10:54
字数 1151
阅读 39
收藏 0

Client提交任务后,JobInProgress类会调用设置的InputFormat类的getSplits函数。

下面分析下TextInputFormat格式的getSplits函数。

先调用了这个函数

/** List input directories.

   * Subclasses may override to, e.g., select only files matching a regular

   * expression.

   * Property mapred.input.subdir, if set, names a subdirectory that

   * is appended to all input dirs specified by job, and if the given fs

   * lists those too, each is added to the returned array of File.

   * @param fs

   * @param job

   * @return array of File objects, never zero length.

   * @throws IOException if zero items.

   */

  protected File[] listFiles(FileSystem fs, JobConf job)

    throws IOException {

    File[] dirs = job.getInputDirs();//获取多个输入路径

    String workDir = job.getWorkingDirectory();//"mapred.working.dir"

    String subdir = job.get("mapred.input.subdir");//获取子目录

    ArrayList result = new ArrayList();//构造一个空集合

    for (int i = 0; i < dirs.length; i++) {//对于每一个目录来说

      // if it is relative, make it absolute using the directory from the 

      // JobConf

      if (workDir != null && !fs.isAbsolute(dirs[i])) {//先加工成绝对路径

        dirs[i] = new File(workDir, dirs[i].toString());

      }

      File[] dir = fs.listFiles(dirs[i]);//获取下面的所有文件

      if (dir != null) {

        for (int j = 0; j < dir.length; j++) {//对于每一个文件来说

          File file = dir[j];//获取当前文件

          if (subdir != null) {//根据需要决定是否需要遍历子目录

            File[] subFiles = fs.listFiles(new File(file, subdir));

            if (subFiles != null) {

              for (int k = 0; k < subFiles.length; k++) {

                result.add(subFiles[k]);

              }

            }

          } else {

            result.add(file);//添加到集合

          }

        }

      }

    }

    if (result.size() == 0) {

      throw new IOException("No input directories specified in: "+job);

    }

    return (File[])result.toArray(new File[result.size()]);//返回所有文件列表

  }

 --------------

 

/** Splits files returned by {#listFiles(FileSystem,JobConf) when

   * they're too big.*/ 

  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)

    throws IOException {

    File[] files = listFiles(fs, job);//调用上面的函数获取所有文件

    for (int i = 0; i < files.length; i++) {      // check we have valid files

      File file = files[i];//拿到当前文件

      if (fs.isDirectory(file) || !fs.exists(file)) {//如果是一个目录或者是不存在的文件

        throw new IOException("Not a file: "+files[i]);//抛出异常

      }

    }

    long totalSize = 0; //开始计算,总大小初始化为0             // compute total size

    for (int i = 0; i < files.length; i++) {

      totalSize += fs.getLength(files[i]);//统计所有文件的大小

    }

    long bytesPerSplit = totalSize / numSplits;   //总的大小/map的个数

//限制1:不能超过块长度

    long fsBlockSize = fs.getBlockSize();//获取块大小

    if (bytesPerSplit > fsBlockSize) {            // no larger than fs blocks

      bytesPerSplit = fsBlockSize;//保证不超过块大小

    }

    

    //限制2:不能小于最低长度

    long configuredMinSplitSize = job.getLong("mapred.min.split.size", 0);//获取配置的最小分片大小

    if( configuredMinSplitSize < minSplitSize )

     configuredMinSplitSize = minSplitSize;//校正

    if (bytesPerSplit < configuredMinSplitSize) { // no smaller than min size

      bytesPerSplit = configuredMinSplitSize;//不能小于最低大小

    }

    long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);//计算溢出10%的长度

    //LOG.info("bytesPerSplit = " + bytesPerSplit);

    //LOG.info("maxPerSplit = " + maxPerSplit);

    ArrayList splits = new ArrayList(numSplits);  //产生分片存储结构,注意ArrayList会自动扩容

    for (int i = 0; i < files.length; i++) {//开始遍历文件

      File file = files[i];//获取当前文件

      long length = fs.getLength(file);//获取长度[这里完全可以做缓存好吧,不需要增加网络IO开销]

      long bytesRemaining = length;//初始化为当前文件大小

      while (bytesRemaining >= maxPerSplit) {//还有剩余大小

        splits.add(new FileSplit(file, length-bytesRemaining, bytesPerSplit));//添加1Block长度

        bytesRemaining -= bytesPerSplit;//调整剩余长度

      }

      

      if (bytesRemaining != 0) {//将最后一个Block加入

        splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));

      }

      //LOG.info( "Generating splits for " + i + "th file: " + file.getName() );

    }

    //LOG.info( "Total # of splits: " + splits.size() );

    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);//返回结果

  }

那么具体JobTrackr又做了什么事情呢?

 

/**

     * Construct the splits, etc.  This is invoked from an async

     * thread so that split-computation doesn't block anyone.

     */

    public void initTasks() throws IOException {

        if (tasksInited) {//如果已经初始化过了,则直接返回

            return;

        }

        //

        // construct input splits

        //

        String jobid = profile.getJobId();//获取当前的job的随机ID

        String jobFile = profile.getJobFile();//获取配置文件在HDFS中的路径

        JobConf jd = new JobConf(localJobFile);//根据本地文件重新生成jobconf对象

        FileSystem fs = FileSystem.get(conf);//获取分布式文件系统

        String ifClassName = jd.get("mapred.input.format.class");//获取输入格式所属类名

        InputFormat inputFormat;

        if (ifClassName != null && localJarFile != null) {

          try {

            ClassLoader loader =

              new URLClassLoader(new URL[]{ localJarFile.toURL() });

            Class inputFormatClass = loader.loadClass(ifClassName);

            inputFormat = (InputFormat)inputFormatClass.newInstance();

          } catch (Exception e) {

            throw new IOException(e.toString());

          }

        } else {

          inputFormat = jd.getInputFormat();//默认为TextInputFormat.class

        }

        FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);//设置分片结束

        //

        // sort splits by decreasing length, to reduce job's tail

        //

        Arrays.sort(splits, new Comparator() {//对 splits进行排序,长度逐一降低

            public int compare(Object a, Object b) {

                long diff =

                    ((FileSplit)b).getLength() - ((FileSplit)a).getLength();

                return diff==0 ? 0 : (diff > 0 ? 1 : -1);

            }

        });

        //

        // adjust number of map tasks to actual number of splits

        //

        this.numMapTasks = splits.length;//重新设置numMapTasks为真正的分片个数

        // create a map task for each split

        this.maps = new TaskInProgress[numMapTasks];//构造容器

        for (int i = 0; i < numMapTasks; i++) {//初始化

            maps[i] = new TaskInProgress(jobFile, splits[i], jobtrackerconfthis);

        }

        //

        // Create reduce tasks

        //

        this.reduces = new TaskInProgress[numReduceTasks];//构造容器

        for (int i = 0; i < numReduceTasks; i++) {//初始化

            reduces[i] = new TaskInProgress(jobFile, maps, i, jobtrackerconfthis);

        }

        // 

        // Obtain some tasktracker-cache information for the map task splits.

        //

        for (int i = 0; i < maps.length; i++) {//对于每一个map任务来说

            String hints[][] = fs.getFileCacheHints(splits[i].getFile(), splits[i].getStart(), splits[i].getLength());

            cachedHints.put(maps[i].getTIPId(), hints);//存储blockDataNode的对应关系

        }

        this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);//设置成Running状态

        tasksInited = true;//标识为已经初始化状态

    }

到这里为止,JobInProgress对象才真正初始化完毕!

 拓展阅读:

http://blog.csdn.net/strongerbit/article/details/7440111

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 900
码字总数 615641
作品 8
南京
架构师
MapReducer之Mapper中的Split切片原理(即影响MapTask数目的原因)

今天看到有朋友问到了MapTask的相关问题,我觉得有必要发个博客结合源码整个解析一下. 一、首先,我们看到Map运行的时候不同文件启动了不同数量的map任务,但是JOB中又没有设置map数量的配置,...

ProgrammerJiang
2016/03/18
379
2
自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduc...

大数据之路
2013/05/26
0
16
InputFormat加载数据

InputFormat是一个抽象类,其定义如下: InputFormat会对数据进行两方面的处理: 对输入数据进行逻辑切分,形成一个个split 针对每个split,新建一个RecorReader读取split里面的数据,形成一...

Jason_typ
06/13
0
0
Hadoop源代码分析(包mapreduce.lib.input)

接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下...

超人学院
2015/05/22
0
0
Hadoop输入和输出的处理类(7)

hadoop输入的处理类 InputFormat InputFormat负责处理MR的输入部分。 作用: 1、验证作业的输入是否规范。 2、把输入文件切分成InputSplit。 3、提供RecordReader的实现类,把InputSplit读到...

肖鋭
2014/03/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

windbg调试C源码级驱动

联机方式不多说了。我博客里有,英文的。 windbg联机文档 https://docs.microsoft.com/zh-cn/windows-hardware/drivers/debugger/debug-universal-drivers---step-by-step-lab--echo-kernel......

simpower
35分钟前
0
0
redis快照和AOF简介

数据持久化到硬盘:一是快照(snapshotting),二是只追加文件(append-only file AOF) 快照 核心原理:redis某个时间内存内的所有数据写入硬盘 场景:redis快照内存里面的数据 1. 用户发送bgsav...

拐美人
35分钟前
0
0
这个七夕,送你一份程序员教科书级别的告白指南

给广大爱码士们的高能预警: 今天,就是七夕了…… (单身非作战人群请速速退场!) 时常有技术GG向个推君抱怨 经过网民多年的教育 以及技术人持之以恒的自黑 冲锋衣狂热分子·格子衫骨灰级粉...

个推
40分钟前
0
0
python爬虫日志(15)cookie详解

转载:原文地址 早期Web开发面临的最大问题之一是如何管理状态。服务器端没有办法知道两个请求是否来自于同一个浏览器。那时的办法是在请求的页面中插入一个token,并且在下一次请求中将这个...

茫羽行
41分钟前
0
0
qlv视频格式转换器

  腾讯视频中的视频影视资源有很多,小编经常在里面下载视频观看,应该也有很多朋友和小编一样吧,最近热播的电视剧也不少,如《香蜜沉沉烬如霜》、《夜天子》还有已经完结的《扶摇》,这么...

萤火的萤火
44分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部