文档章节

Hadoop 1.x的Shuffle源码分析之3

brian_2017
 brian_2017
发布于 2017/01/17 09:44
字数 449
阅读 1
收藏 0

shuffle有两种,一种是在内存存储数据,另一种是在本地文件存储数据,两者几乎一致。


以本地文件进行shuffle的过程为例:

mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength)

shuffleToDisk函数如下:

private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
                                      InputStream input,
                                      Path filename,
                                      long mapOutputLength) 
      throws IOException {
// Find out a suitable location for the output on local-filesystem
//在本地文件系统做输出,输出文件的path
        Path localFilename = 
          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
                                         mapOutputLength, conf);
//创建Map输出
        MapOutput mapOutput = 
          new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
                        conf, localFileSys.makeQualified(localFilename), 
                        mapOutputLength);


        // Copy data to local-disk
//从input读取数据,写入到本地文件,这个input是http连接创建的流式输入
        OutputStream output = null;
        long bytesRead = 0;
        try {
          output = rfs.create(localFilename);
          
          byte[] buf = new byte[64 * 1024];
          int n = -1;
          try {
            n = input.read(buf, 0, buf.length);
          } catch (IOException ioe) {
            readError = true;
            throw ioe;
          }
          while (n > 0) {
            bytesRead += n;
            shuffleClientMetrics.inputBytes(n);
            output.write(buf, 0, n);

            // indicate we're making progress
            reporter.progress();
            try {
              n = input.read(buf, 0, buf.length);
            } catch (IOException ioe) {
              readError = true;
              throw ioe;
            }
          }

          LOG.info("Read " + bytesRead + " bytes from map-output for " +
              mapOutputLoc.getTaskAttemptId());
//正常取完数据,关闭。
          output.close();
          input.close();
        } catch (IOException ioe) {
          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
                   ioe);

          // Discard the map-output
try {
            mapOutput.discard();
          } catch (IOException ignored) {
            LOG.info("Failed to discard map-output from " + 
                mapOutputLoc.getTaskAttemptId(), ignored);
          }
          mapOutput = null;

          // Close the streams
          IOUtils.cleanup(LOG, input, output);

          // Re-throw
          throw ioe;
        }

        // Sanity check
//检查读取是否正常
        if (bytesRead != mapOutputLength) {
          try {
            mapOutput.discard();
          } catch (Exception ioe) {
            // IGNORED because we are cleaning up
            LOG.info("Failed to discard map-output from " + 
                mapOutputLoc.getTaskAttemptId(), ioe);
          } catch (Throwable t) {
            String msg = getTaskID() + " : Failed in shuffle to disk :" 
                         + StringUtils.stringifyException(t);
            reportFatalError(getTaskID(), t, msg);
          }
          mapOutput = null;

          throw new IOException("Incomplete map output received for " +
                                mapOutputLoc.getTaskAttemptId() + " from " +
                                mapOutputLoc.getOutputLocation() + " (" + 
                                bytesRead + " instead of " + 
                                mapOutputLength + ")"
          );
        }

        return mapOutput;

      }

所以说,这一段shuffle的本质就是,从http的输入流读取数据,然后存放在本地文件系统的磁盘文件,写完之后,把taskId, jobid,本地文件名等等诸多参数放在MapOutput对象记录下来,然后返回一个MapOutput对象。


java的代码很直接,没有花花绕的东东,除了略有一点冗长,实在没什么缺点  :)

© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
面试问题

358、你们的集群规模? 开发集群:10台(8台可用)8核cpu 359、你们的数据是用什么导入到数据库的?导入到什么数据库? 处理之前的导入:通过hadoop命令导入到hdfs文件系统 处理完成之后的导...

HIVE
2016/07/05
1K
2
Apache Hadoop 2.2.0 稳定版发布

Apache Hadoop 2.2.0 稳定版发布了,建议用户升级。该版本更加稳定,同时在 API 和协议上兼容老的版本。 与 Hadoop 1.x 比较,该版本显著的改进包括: YARN - A general purpose resource ma...

潞邊壹仦貓
2013/10/16
13K
22
Apache Kylin 2.5.0 发布,开源分布式分析引擎

Apache Kylin 2.5.0 已发布,这是 2.4 之后的又一个主要版本,包含 96 项修复以及各种改进,像是支持 Hadoop 3.0,支持将 MySQL 作为 Kylin 元数据存储,支持 HBase 2.0 等等,具体内容请查看...

王练
2018/09/20
1K
1
Yarn在Shuffle阶段内存不足问题(error in shuffle in fetcher)

最近在使用MR跑一个任务的时候shuffle阶段出现OOM,这个问题之前从来没有遇到过,上网找了一下,发现网友也遇到过想似的问题,以下是转载的该问题的解决方法: 原文地址:http://blog.csdn....

纳兰清风
2016/12/15
859
0
零基础学习hadoop到上手工作线路指导(中级篇)

此篇是在零基础学习hadoop到上手工作线路指导(初级篇)的基础,一个继续总结。 五一假期:在写点内容,也算是总结。上面我们会了基本的编程,我们需要对hadoop有一个更深的理解: hadoop分为...

一枚Sir
2014/08/07
138
0

没有更多内容

加载失败,请刷新页面

加载更多

好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
今天
6
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
今天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
今天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
今天
17
0
浅谈java过滤器Filter

一、简介 Servlet中的过滤器Filter是实现了javax.servlet.Filter接口的服务器端程序,主要的用途是过滤字符编码、做一些业务逻辑判断如是否有权限访问页面等。其工作原理是,只要你在web.xml...

青衣霓裳
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部