文档章节

Hadoop 1.x的Shuffle源码分析之2

brian_2017
 brian_2017
发布于 2017/01/17 09:44
字数 437
阅读 7
收藏 0

ReduceTask类的内嵌类ReduceCopier的内嵌类MapOutputCopier的函数copyOutput是Shuffle里最重要的一环,它以http的方式,从远程主机取数据:创建临时文件名,然后用http读数据,再保存到内存文件系统或者本地文件系统。它读取远程文件的函数是getMapOutput。


getMapOutput函数如下:

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
                                     Path filename, int reduce)
      throws IOException, InterruptedException {
        //建立http链接
        URL url = mapOutputLoc.getOutputLocation();
        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
        //创建输入流
        InputStream input = setupSecureConnection(mapOutputLoc, connection);

        //检查连接姿势是否正确
        int rc = connection.getResponseCode();
        if (rc != HttpURLConnection.HTTP_OK) {
          throw new IOException(
              "Got invalid response code " + rc + " from " + url +
              ": " + connection.getResponseMessage());
        }
 
        //从http链接获取mapId
        TaskAttemptID mapId = null;
        try {
          mapId =
            TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
        } catch (IllegalArgumentException ia) {
          LOG.warn("Invalid map id ", ia);
          return null;
        }
</pre><pre code_snippet_id="665348" snippet_file_name="blog_20150513_3_7696491" name="code" class="java">        //检查mapId是否一致
        TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
        if (!mapId.equals(expectedMapId)) {
          LOG.warn("data from wrong map:" + mapId +
              " arrived to reduce task " + reduce +
              ", where as expected map output should be from " + expectedMapId);
          return null;
        }
//如果数据有压缩,要获取压缩长度
        long decompressedLength = 
          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
        long compressedLength = 
          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

        if (compressedLength < 0 || decompressedLength < 0) {
          LOG.warn(getName() + " invalid lengths in map output header: id: " +
              mapId + " compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength);
          return null;
        }
int forReduce =
          (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
        
        if (forReduce != reduce) {
          LOG.warn("data for the wrong reduce: " + forReduce +
              " with compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength +
              " arrived to reduce task " + reduce);
          return null;
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength);
        }

        //We will put a file in memory if it meets certain criteria:
        //1. The size of the (decompressed) file should be less than 25% of 
        //    the total inmem fs
        //2. There is space available in the inmem fs
        
        // Check if this map-output can be saved in-memory
        boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 

        // Shuffle
        MapOutput mapOutput = null;
        if (shuffleInMemory) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into RAM from " + mapOutputLoc.getTaskAttemptId());
          }
          //在内存做shuffle处理
          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
                                      (int)decompressedLength,
                                      (int)compressedLength);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
          }
          //在本地做shuffle处理
          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength);
        }
        mapOutput.decompressedSize = decompressedLength;    
        return mapOutput;
      }


© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
面试问题

358、你们的集群规模? 开发集群:10台(8台可用)8核cpu 359、你们的数据是用什么导入到数据库的?导入到什么数据库? 处理之前的导入:通过hadoop命令导入到hdfs文件系统 处理完成之后的导...

HIVE
2016/07/05
1K
2
Apache Hadoop 2.2.0 稳定版发布

Apache Hadoop 2.2.0 稳定版发布了,建议用户升级。该版本更加稳定,同时在 API 和协议上兼容老的版本。 与 Hadoop 1.x 比较,该版本显著的改进包括: YARN - A general purpose resource ma...

潞邊壹仦貓
2013/10/16
13K
22
Yarn在Shuffle阶段内存不足问题(error in shuffle in fetcher)

最近在使用MR跑一个任务的时候shuffle阶段出现OOM,这个问题之前从来没有遇到过,上网找了一下,发现网友也遇到过想似的问题,以下是转载的该问题的解决方法: 原文地址:http://blog.csdn....

纳兰清风
2016/12/15
849
0
一篇文章了解 Spark Shuffle 内存使用

在使用 Spark 进行计算时,我们经常会碰到作业 (Job) Out Of Memory(OOM) 的情况,而且很大一部分情况是发生在 Shuffle 阶段。那么在 Spark Shuffle 中具体是哪些地方会使用比较多的内存而有...

Spark
03/17
0
0
零基础学习hadoop到上手工作线路指导(中级篇)

此篇是在零基础学习hadoop到上手工作线路指导(初级篇)的基础,一个继续总结。 五一假期:在写点内容,也算是总结。上面我们会了基本的编程,我们需要对hadoop有一个更深的理解: hadoop分为...

一枚Sir
2014/08/07
137
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
446
10
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
4
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
3
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
7
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部