Hadoop 1.x的Shuffle源码分析之3

原创
2017/01/17 09:44
阅读数 4

shuffle有两种,一种是在内存存储数据,另一种是在本地文件存储数据,两者几乎一致。


以本地文件进行shuffle的过程为例:

mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength)

shuffleToDisk函数如下:

private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
                                      InputStream input,
                                      Path filename,
                                      long mapOutputLength) 
      throws IOException {
// Find out a suitable location for the output on local-filesystem
//在本地文件系统做输出,输出文件的path
        Path localFilename = 
          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
                                         mapOutputLength, conf);
//创建Map输出
        MapOutput mapOutput = 
          new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
                        conf, localFileSys.makeQualified(localFilename), 
                        mapOutputLength);


        // Copy data to local-disk
//从input读取数据,写入到本地文件,这个input是http连接创建的流式输入
        OutputStream output = null;
        long bytesRead = 0;
        try {
          output = rfs.create(localFilename);
          
          byte[] buf = new byte[64 * 1024];
          int n = -1;
          try {
            n = input.read(buf, 0, buf.length);
          } catch (IOException ioe) {
            readError = true;
            throw ioe;
          }
          while (n > 0) {
            bytesRead += n;
            shuffleClientMetrics.inputBytes(n);
            output.write(buf, 0, n);

            // indicate we're making progress
            reporter.progress();
            try {
              n = input.read(buf, 0, buf.length);
            } catch (IOException ioe) {
              readError = true;
              throw ioe;
            }
          }

          LOG.info("Read " + bytesRead + " bytes from map-output for " +
              mapOutputLoc.getTaskAttemptId());
//正常取完数据,关闭。
          output.close();
          input.close();
        } catch (IOException ioe) {
          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
                   ioe);

          // Discard the map-output
try {
            mapOutput.discard();
          } catch (IOException ignored) {
            LOG.info("Failed to discard map-output from " + 
                mapOutputLoc.getTaskAttemptId(), ignored);
          }
          mapOutput = null;

          // Close the streams
          IOUtils.cleanup(LOG, input, output);

          // Re-throw
          throw ioe;
        }

        // Sanity check
//检查读取是否正常
        if (bytesRead != mapOutputLength) {
          try {
            mapOutput.discard();
          } catch (Exception ioe) {
            // IGNORED because we are cleaning up
            LOG.info("Failed to discard map-output from " + 
                mapOutputLoc.getTaskAttemptId(), ioe);
          } catch (Throwable t) {
            String msg = getTaskID() + " : Failed in shuffle to disk :" 
                         + StringUtils.stringifyException(t);
            reportFatalError(getTaskID(), t, msg);
          }
          mapOutput = null;

          throw new IOException("Incomplete map output received for " +
                                mapOutputLoc.getTaskAttemptId() + " from " +
                                mapOutputLoc.getOutputLocation() + " (" + 
                                bytesRead + " instead of " + 
                                mapOutputLength + ")"
          );
        }

        return mapOutput;

      }

所以说,这一段shuffle的本质就是,从http的输入流读取数据,然后存放在本地文件系统的磁盘文件,写完之后,把taskId, jobid,本地文件名等等诸多参数放在MapOutput对象记录下来,然后返回一个MapOutput对象。


java的代码很直接,没有花花绕的东东,除了略有一点冗长,实在没什么缺点  :)

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部