文档章节

TextInputFormat之getSplits函数分析

强子大叔的码田
 强子大叔的码田
发布于 2014/12/31 10:54
字数 1151
阅读 117
收藏 0

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

Client提交任务后,JobInProgress类会调用设置的InputFormat类的getSplits函数。

下面分析下TextInputFormat格式的getSplits函数。

先调用了这个函数

/** List input directories.

   * Subclasses may override to, e.g., select only files matching a regular

   * expression.

   * Property mapred.input.subdir, if set, names a subdirectory that

   * is appended to all input dirs specified by job, and if the given fs

   * lists those too, each is added to the returned array of File.

   * @param fs

   * @param job

   * @return array of File objects, never zero length.

   * @throws IOException if zero items.

   */

  protected File[] listFiles(FileSystem fs, JobConf job)

    throws IOException {

    File[] dirs = job.getInputDirs();//获取多个输入路径

    String workDir = job.getWorkingDirectory();//"mapred.working.dir"

    String subdir = job.get("mapred.input.subdir");//获取子目录

    ArrayList result = new ArrayList();//构造一个空集合

    for (int i = 0; i < dirs.length; i++) {//对于每一个目录来说

      // if it is relative, make it absolute using the directory from the 

      // JobConf

      if (workDir != null && !fs.isAbsolute(dirs[i])) {//先加工成绝对路径

        dirs[i] = new File(workDir, dirs[i].toString());

      }

      File[] dir = fs.listFiles(dirs[i]);//获取下面的所有文件

      if (dir != null) {

        for (int j = 0; j < dir.length; j++) {//对于每一个文件来说

          File file = dir[j];//获取当前文件

          if (subdir != null) {//根据需要决定是否需要遍历子目录

            File[] subFiles = fs.listFiles(new File(file, subdir));

            if (subFiles != null) {

              for (int k = 0; k < subFiles.length; k++) {

                result.add(subFiles[k]);

              }

            }

          } else {

            result.add(file);//添加到集合

          }

        }

      }

    }

    if (result.size() == 0) {

      throw new IOException("No input directories specified in: "+job);

    }

    return (File[])result.toArray(new File[result.size()]);//返回所有文件列表

  }

 --------------

 

/** Splits files returned by {#listFiles(FileSystem,JobConf) when

   * they're too big.*/ 

  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)

    throws IOException {

    File[] files = listFiles(fs, job);//调用上面的函数获取所有文件

    for (int i = 0; i < files.length; i++) {      // check we have valid files

      File file = files[i];//拿到当前文件

      if (fs.isDirectory(file) || !fs.exists(file)) {//如果是一个目录或者是不存在的文件

        throw new IOException("Not a file: "+files[i]);//抛出异常

      }

    }

    long totalSize = 0; //开始计算,总大小初始化为0             // compute total size

    for (int i = 0; i < files.length; i++) {

      totalSize += fs.getLength(files[i]);//统计所有文件的大小

    }

    long bytesPerSplit = totalSize / numSplits;   //总的大小/map的个数

//限制1:不能超过块长度

    long fsBlockSize = fs.getBlockSize();//获取块大小

    if (bytesPerSplit > fsBlockSize) {            // no larger than fs blocks

      bytesPerSplit = fsBlockSize;//保证不超过块大小

    }

    

    //限制2:不能小于最低长度

    long configuredMinSplitSize = job.getLong("mapred.min.split.size", 0);//获取配置的最小分片大小

    if( configuredMinSplitSize < minSplitSize )

     configuredMinSplitSize = minSplitSize;//校正

    if (bytesPerSplit < configuredMinSplitSize) { // no smaller than min size

      bytesPerSplit = configuredMinSplitSize;//不能小于最低大小

    }

    long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);//计算溢出10%的长度

    //LOG.info("bytesPerSplit = " + bytesPerSplit);

    //LOG.info("maxPerSplit = " + maxPerSplit);

    ArrayList splits = new ArrayList(numSplits);  //产生分片存储结构,注意ArrayList会自动扩容

    for (int i = 0; i < files.length; i++) {//开始遍历文件

      File file = files[i];//获取当前文件

      long length = fs.getLength(file);//获取长度[这里完全可以做缓存好吧,不需要增加网络IO开销]

      long bytesRemaining = length;//初始化为当前文件大小

      while (bytesRemaining >= maxPerSplit) {//还有剩余大小

        splits.add(new FileSplit(file, length-bytesRemaining, bytesPerSplit));//添加1Block长度

        bytesRemaining -= bytesPerSplit;//调整剩余长度

      }

      

      if (bytesRemaining != 0) {//将最后一个Block加入

        splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));

      }

      //LOG.info( "Generating splits for " + i + "th file: " + file.getName() );

    }

    //LOG.info( "Total # of splits: " + splits.size() );

    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);//返回结果

  }

那么具体JobTrackr又做了什么事情呢?

 

/**

     * Construct the splits, etc.  This is invoked from an async

     * thread so that split-computation doesn't block anyone.

     */

    public void initTasks() throws IOException {

        if (tasksInited) {//如果已经初始化过了,则直接返回

            return;

        }

        //

        // construct input splits

        //

        String jobid = profile.getJobId();//获取当前的job的随机ID

        String jobFile = profile.getJobFile();//获取配置文件在HDFS中的路径

        JobConf jd = new JobConf(localJobFile);//根据本地文件重新生成jobconf对象

        FileSystem fs = FileSystem.get(conf);//获取分布式文件系统

        String ifClassName = jd.get("mapred.input.format.class");//获取输入格式所属类名

        InputFormat inputFormat;

        if (ifClassName != null && localJarFile != null) {

          try {

            ClassLoader loader =

              new URLClassLoader(new URL[]{ localJarFile.toURL() });

            Class inputFormatClass = loader.loadClass(ifClassName);

            inputFormat = (InputFormat)inputFormatClass.newInstance();

          } catch (Exception e) {

            throw new IOException(e.toString());

          }

        } else {

          inputFormat = jd.getInputFormat();//默认为TextInputFormat.class

        }

        FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);//设置分片结束

        //

        // sort splits by decreasing length, to reduce job's tail

        //

        Arrays.sort(splits, new Comparator() {//对 splits进行排序,长度逐一降低

            public int compare(Object a, Object b) {

                long diff =

                    ((FileSplit)b).getLength() - ((FileSplit)a).getLength();

                return diff==0 ? 0 : (diff > 0 ? 1 : -1);

            }

        });

        //

        // adjust number of map tasks to actual number of splits

        //

        this.numMapTasks = splits.length;//重新设置numMapTasks为真正的分片个数

        // create a map task for each split

        this.maps = new TaskInProgress[numMapTasks];//构造容器

        for (int i = 0; i < numMapTasks; i++) {//初始化

            maps[i] = new TaskInProgress(jobFile, splits[i], jobtrackerconfthis);

        }

        //

        // Create reduce tasks

        //

        this.reduces = new TaskInProgress[numReduceTasks];//构造容器

        for (int i = 0; i < numReduceTasks; i++) {//初始化

            reduces[i] = new TaskInProgress(jobFile, maps, i, jobtrackerconfthis);

        }

        // 

        // Obtain some tasktracker-cache information for the map task splits.

        //

        for (int i = 0; i < maps.length; i++) {//对于每一个map任务来说

            String hints[][] = fs.getFileCacheHints(splits[i].getFile(), splits[i].getStart(), splits[i].getLength());

            cachedHints.put(maps[i].getTIPId(), hints);//存储blockDataNode的对应关系

        }

        this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);//设置成Running状态

        tasksInited = true;//标识为已经初始化状态

    }

到这里为止,JobInProgress对象才真正初始化完毕!

 拓展阅读:

http://blog.csdn.net/strongerbit/article/details/7440111

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 923
博文 1630
码字总数 1283214
作品 9
南京
架构师
私信 提问
加载中

评论(0)

FileInputFormat 的实现之TextInputFormat

##说明TextInputFormat默认是按行切分记录record,本篇在于理解,对于同一条记录record,如果被切分在不同的split时是怎么处理的。首先getSplits是在逻辑上划分,并没有物理切分,也就是只是...

osc_g6d2xdbw
2019/07/30
0
0
MapReducer之Mapper中的Split切片原理(即影响MapTask数目的原因)

今天看到有朋友问到了MapTask的相关问题,我觉得有必要发个博客结合源码整个解析一下. 一、首先,我们看到Map运行的时候不同文件启动了不同数量的map任务,但是JOB中又没有设置map数量的配置,...

ProgrammerJiang
2016/03/18
428
2
自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduc...

大数据之路
2013/05/26
1.3W
16
InputFormat加载数据

InputFormat是一个抽象类,其定义如下: InputFormat会对数据进行两方面的处理: 对输入数据进行逻辑切分,形成一个个split 针对每个split,新建一个RecorReader读取split里面的数据,形成一...

Jason_typ
2018/06/13
29
0
Hadoop_MapReduce工作原理

Hadoop_MapReduce工作原理 六个阶段: Input 文件输入 Splitting 分片 Mapping Shuffling Reducing Final result mapper的输入数据为KV对形式,每一个KV对都会调用map()方法,输出数据也是K...

BLUcoding
04/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

SQL Server 并发控制 第三篇:隔离级别和行版本(2)

SQL Server 并发控制 第一篇:并发模式和事务 SQL Server 并发控制 第二篇:隔离级别和锁(1) SQL Server 并发控制 第三篇:隔离级别和行版本(2) 隔离级别定义事务处理数据读取操作的隔离...

osc_x5cptzgg
47分钟前
21
0
[Deprecation] Synchronous XMLHttpRequest on the main thread is deprecated because of its detrimen...

关于AJAX 同步请求(我没分了)_已解决_博问_博客园 https://q.cnblogs.com/q/70126/ jquery - JavaScript console.log causes error: "Synchronous XMLHttpRequest on the main thread is d......

osc_cxi2ewsx
48分钟前
29
0
python pip install指定国内源镜像

  有时候安装一些依赖包,网不好,直接超时,或者这个包就是死都下不下来的时候,可以指定国内源镜像。   pip install -i 国内镜像地址 包名   e.g. pip install -i http://mirrors.al...

osc_ym1l2qni
49分钟前
19
0
uniapp打包发版到linux服务器步骤----H5端

最近在写uni-app项目,项目打包部署到服务器后,搞了好一会一直打开是空白页,原来自己有几个地方疏忽了,现把步骤整理一下: 第1步:编辑配置 mainifest.json 文件 tip:运行的基础路径,我...

osc_2sv5yx4m
49分钟前
23
0
.NET Core微服务之基于Ocelot实现API网关服务(续)

Tip: 此篇已加入.NET Core微服务基础系列文章索引 一、负载均衡与请求缓存 1.1 负载均衡   为了验证负载均衡,这里我们配置了两个Consul Client节点,其中ClientService分别部署于这两个节...

osc_ho8dcqsx
51分钟前
27
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部