文档章节

体会 Hadoop 数据节点升级机制中的设计之美

blacklovebear
 blacklovebear
发布于 2017/02/06 11:17
字数 2462
阅读 110
收藏 7

一、前言

Hadoop数据节点的升级机制,深入了解下发现设计非常的优美,在此分享给大家。升级机制最重要的部分就是升级过程中的故障恢复。我们来看下它是怎么被解决的。

关键点:

  1. 升级过程生成临时目录,标识中间状态
  2. 启动时分析出当前数据节点的存储空间状态
  3. 根据存储空间状态执行相应的操作使数据节点恢复正常

接下来我们按关键点分析,看看它到底是怎么实现的。其中涉及到不少 Hadoop 源代码的分析,关键代码我会贴出来,如果大家想看完整的可以自行下载浏览。

 

二、数据节点的升级

数据节点的升级包含三个非常重要的步骤:​​​​​​升级、升级提交和升级回滚。我们先看下他们之间的联系。

  • 升级

下面代码在 Hadoop 1.0.0 版本下的 DataStorage.java 文件中。文件路径为:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\datanode。

void doUpgrade(StorageDirectory sd,
                 NamespaceInfo nsInfo
                 ) throws IOException {
    ...
    // enable hardlink stats via hardLink object instance
    HardLink hardLink = new HardLink();
    
    File curDir = sd.getCurrentDir();
    File prevDir = sd.getPreviousDir();
    assert curDir.exists() : "Current directory must exist.";
    // delete previous dir before upgrading
    if (prevDir.exists())
      deleteDir(prevDir);
    File tmpDir = sd.getPreviousTmp();
    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
    // rename current to tmp
    rename(curDir, tmpDir);
    // hardlink blocks
    linkBlocks(tmpDir, curDir, this.getLayoutVersion(), hardLink);
    // write version file
    this.layoutVersion = FSConstants.LAYOUT_VERSION;
    assert this.namespaceID == nsInfo.getNamespaceID() :
      "Data-node and name-node layout versions must be the same.";
    this.cTime = nsInfo.getCTime();
    sd.write();
    // rename tmp to previous
    rename(tmpDir, prevDir);
    ...
  }

上面的代码并不是特别难。我们看最关键的几个步骤:

  1. 将数据节点当前数据的存储目录(current) 改名为临时目录(previous.tmp)
  2. 生成新的 current 目录,并将当前目录下所有文件使用硬链接的模式链接到 previous.tmp 目录中的文件
  3. 将 VERSION 文件写入到 current 目录
  4. 将临时目录(previous.tmp)改名为 previous 目录

通过以上几个步骤我们发现在升级过程中做的操作并不复杂,我们关键要注意临时目录(previous.tmp),因为它是后续判断升级是否异常的关键依据。

  • 升级提交

我们再来看下升级提交的代码,代码仍然在 DataStorage.java 文件中。

void doFinalize(StorageDirectory sd) throws IOException {
    File prevDir = sd.getPreviousDir();
    if (!prevDir.exists())
      return; // already discarded
    final String dataDirPath = sd.getRoot().getCanonicalPath();
    ...
    assert sd.getCurrentDir().exists() : "Current directory must exist.";
    final File tmpDir = sd.getFinalizedTmp();
    // rename previous to tmp
    rename(prevDir, tmpDir);

    // delete tmp dir in a separate thread
    new Daemon(new Runnable() {
        public void run() {
          try {
            deleteDir(tmpDir);
          } catch(IOException ex) {
            LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
          }
          LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
        }
        public String toString() { return "Finalize " + dataDirPath; }
      }).start();
  }

通过代码发现,升级提交流程非常的简单,只有关键的两步:

  1. 将 previous 目录重命名为 临时目录(finalized.tmp)
  2. 将临时目录(finalized.tmp)利用单独线程删除

通过上面我们发现即使是简单的将一个目录删除 Hadoop 也分了两步,并且还需要一个临时目录作为中间状态

  • 升级回滚

最后我们再来看下升级回滚的代码,同样在 DataStorage.java 文件中。

void doRollback( StorageDirectory sd,
                   NamespaceInfo nsInfo
                   ) throws IOException {
    File prevDir = sd.getPreviousDir();
    // regular startup if previous dir does not exist
    if (!prevDir.exists())
      return;
    DataStorage prevInfo = new DataStorage();
    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
    prevSD.read(prevSD.getPreviousVersionFile());

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
          && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
      throw new InconsistentFSStateException(prevSD.getRoot(),
                                             "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
                                             + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
                                             + " is newer than the namespace state: LV = "
                                             + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
    ...
    File tmpDir = sd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // rename current to tmp
    File curDir = sd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    // rename previous to current
    rename(prevDir, curDir);
    // delete tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
  }

同样我们来分析下上面的代码,找出其中的关键步骤:

  1. 根据版本信息判断是否可以进行回滚操作
  2. 将当前目录(current)重命名为临时目录(removed.tmp)
  3. 将之前一个版本的目录(previous)重命名为当前目录(current)
  4. 删除临时目录(removed.tmp)

我们发现回滚操作也利用了临时文件夹(removed.tmp)。

通过上面的分析我们已经清晰的知道数据节点升级的三个关键操作。接下来我们继续了解当在升级过程中发生异常时,数据节点又是如何处理的。

 

三、分析存储空间状态

通过上面的分析我们发现每个步骤都会生成像 "previous.tmp、removed.tmp"这样的临时目录,它们的作用是什么呢?

我们发现在执行升级、回滚等操作时都需要进行一定的操作,如果在做这些操作的时候设备出现故障(如断电)那么存储空间就会处于一个中间状态。引入上述的这些临时目录就能判断异常发生在什么操作的什么状态,这样就会方便后续的故障恢复。

数据节点在启动的时候会对当前节点的存储空间进行分析,得出存储空间的状态,然后根据不同的状态执行不同的操作。如果发现分析出的状态不是正常状态,存在中间状态或异常状态(例如:发现升级过程中的临时目录),则启动 recovery 进行恢复。

我们看下这部分的关键代码,这部分代码仍然在 DataStorage.java 文件中。

void recoverTransitionRead(NamespaceInfo nsInfo,
                             Collection<File> dataDirs,
                             StartupOption startOpt
                             ) throws IOException {
    ...
    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
      File dataDir = it.next();
      StorageDirectory sd = new StorageDirectory(dataDir);
      StorageState curState;
      try {
        curState = sd.analyzeStorage(startOpt);      // 分析当前存储状态
        // sd is locked but not opened
        switch(curState) {
        case NORMAL:
          break;
        case NON_EXISTENT:
          // ignore this storage
          LOG.info("Storage directory " + dataDir + " does not exist.");
          it.remove();
          continue;
        case NOT_FORMATTED: // format
          LOG.info("Storage directory " + dataDir + " is not formatted.");
          LOG.info("Formatting ...");
          format(sd, nsInfo);
          break;
        default:  // recovery part is common
          sd.doRecover(curState);                    // 发现中间或异常状态,进行恢复
        }
      } catch (IOException ioe) {
    ...
  }

明白了这个流程,接下来我们就深入了解下 analyzeStorage 的具体内容和总共有多少种状态。

  • 存储空间状态

通过代码我们发现存储空间的状态总共有以下几种:

 public enum StorageState {
    NON_EXISTENT,
    NOT_FORMATTED,
    COMPLETE_UPGRADE,
    RECOVER_UPGRADE,
    COMPLETE_FINALIZE,
    COMPLETE_ROLLBACK,
    RECOVER_ROLLBACK,
    COMPLETE_CHECKPOINT,
    RECOVER_CHECKPOINT,
    NORMAL;
  }
  ...
  // Startup options
  static public enum StartupOption{
    FORMAT  ("-format"),   //格式化系统
    REGULAR ("-regular"),  //正常启动HDFS
    UPGRADE ("-upgrade"),  //升级系统
    ROLLBACK("-rollback"), //从升级中回滚到前一个版本
    FINALIZE("-finalize"), //提交一次升级
    IMPORT  ("-importCheckpoint");// 从名字节点的一个检查点恢复
    ...
  }
  

我们发现上面提到的所有存储空间状态和当前施加在存储空间的动作相关。其中只有部分是和升级相关的。

  • 分析出当前存储空间状态

接下来我们好好研究下这些状态是怎么得到的?

要回答上面我们提出的问题,就必须好看下状态分析函数:analyzeStorage。这函数在 Storage.java 文件中。路径为:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\common。

public StorageState analyzeStorage(StartupOption startOpt) throws IOException {
      ...
      // check whether current directory is valid
      File versionFile = getVersionFile();
      boolean hasCurrent = versionFile.exists();

      // check which directories exist
      boolean hasPrevious = getPreviousDir().exists();
      boolean hasPreviousTmp = getPreviousTmp().exists();
      boolean hasRemovedTmp = getRemovedTmp().exists();
      boolean hasFinalizedTmp = getFinalizedTmp().exists();
      boolean hasCheckpointTmp = getLastCheckpointTmp().exists();

      if (!(hasPreviousTmp || hasRemovedTmp
          || hasFinalizedTmp || hasCheckpointTmp)) {
        // no temp dirs - no recovery
        if (hasCurrent)
          return StorageState.NORMAL;
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                              "version file in current directory is missing.");
        return StorageState.NOT_FORMATTED;
      }

      if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
          + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
        // more than one temp dirs
        throw new InconsistentFSStateException(root,
                                               "too many temporary directories.");

      // # of temp dirs == 1 should either recover or complete a transition
      if (hasCheckpointTmp) {
        return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
                          : StorageState.RECOVER_CHECKPOINT;
      }

      if (hasFinalizedTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
                                                 + "cannot exist together.");
        return StorageState.COMPLETE_FINALIZE;
      }

      if (hasPreviousTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
                                                 + " cannot exist together.");
        if (hasCurrent)
          return StorageState.COMPLETE_UPGRADE;
        return StorageState.RECOVER_UPGRADE;
      }
      
      assert hasRemovedTmp : "hasRemovedTmp must be true";
      if (!(hasCurrent ^ hasPrevious))
        throw new InconsistentFSStateException(root,
                                               "one and only one directory " + STORAGE_DIR_CURRENT 
                                               + " or " + STORAGE_DIR_PREVIOUS 
                                               + " must be present when " + STORAGE_TMP_REMOVED
                                               + " exists.");
      if (hasCurrent)
        return StorageState.COMPLETE_ROLLBACK;
      return StorageState.RECOVER_ROLLBACK;
    }

通过上面的代码我们就可以清晰的知道每个存储空间状态是如何得到的。这里我们重点解释下 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 这两个存储空间状态。

  1. COMPLETE_UPGRADE:“previous.tmp”目录存在,同时“current”目录下的“VERSION”文件存在,可完成升级
  2. RECOVER_UPGRADE:“previous.tmp”目录存在,“current/VERSION”文件不存在,存储空间应该从升级过程中恢复

其他的状态就不做一一分析,大家根据代码应该很容易就可以得出各个状态成立的条件。

 

四、根据存储空间状态进行恢复

得到了分析后的存储空间状态,我们就可以根据不同的状态将存储系统恢复正常。我们来看下这部分代码,这部分代码 仍然在 Storage.java 文件中。

public void doRecover(StorageState curState) throws IOException {
      File curDir = getCurrentDir();
      String rootPath = root.getCanonicalPath();
      switch(curState) {
      case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
        LOG.info("Completing previous upgrade for storage directory " 
                 + rootPath + ".");
        rename(getPreviousTmp(), getPreviousDir());
        return;
      case RECOVER_UPGRADE:   // mv previous.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from previous upgrade.");
        if (curDir.exists())
          deleteDir(curDir);
        rename(getPreviousTmp(), curDir);
        return;
      case COMPLETE_ROLLBACK: // rm removed.tmp
        LOG.info("Completing previous rollback for storage directory "
                 + rootPath + ".");
        deleteDir(getRemovedTmp());
        return;
      case RECOVER_ROLLBACK:  // mv removed.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from previous rollback.");
        rename(getRemovedTmp(), curDir);
        return;
      case COMPLETE_FINALIZE: // rm finalized.tmp
        LOG.info("Completing previous finalize for storage directory "
                 + rootPath + ".");
        deleteDir(getFinalizedTmp());
        return;
      case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
        LOG.info("Completing previous checkpoint for storage directory " 
                 + rootPath + ".");
        File prevCkptDir = getPreviousCheckpoint();
        if (prevCkptDir.exists())
          deleteDir(prevCkptDir);
        rename(getLastCheckpointTmp(), prevCkptDir);
        return;
      case RECOVER_CHECKPOINT:  // mv lastcheckpoint.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from failed checkpoint.");
        if (curDir.exists())
          deleteDir(curDir);
        rename(getLastCheckpointTmp(), curDir);
        return;
      default:
        throw new IOException("Unexpected FS state: " + curState);
      }
    }

这部分代码非常容易理解,就是根据不同的状态执行不同的操作。我们仍然只分析两个关键的状态 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 对应的操作。

  1. COMPLETE_UPGRADE:将临时目录“previous.tmp”改名为“previous”
  2. RECOVER_UPGRADE:将临时目录“previous.tmp”改名为“current”

这两个状态对应的操作也很好理解,分别是完成未完成的升级操作和回退未完成的升级操作。

五、总结

以上升级机制的关键就在于在做各种升级操作的时候很好的利用临时文件夹,方便后续分析当前存储空间状态和对异常情况进行恢复。

同时也很好的利用了存储空间的状态机机制,这样很好的降低了各种状态之间的耦合性。

关键点:

  • 临时文件夹
  • 存储空间状态机

© 著作权归作者所有

blacklovebear

blacklovebear

粉丝 6
博文 29
码字总数 8955
作品 0
深圳
私信 提问
hadoop hdfs文件系统分析,名字节点,数据节点之间的交互

看过的一些文章: hadoop页面访问 8088端口http://192.168.8.71:8088 hdfs页面 http://192.168.8.70:50070/dfshealth.html#tab-overview 看过的文章地址:http://dongxicheng.org/mapreduce/......

writeademo
2016/11/23
84
0
【翻译笔记】Hadoop分布式文件系统

摘要 Hadoop分布式文件系统(HDFS)设计用来可靠的存储超大数据集,同时以高速带宽将数据集传输给用户应用。 在一个超大集群中,数以千计的服务器直接接触存储器和执行用户应用任务。通过许多...

realsa
2014/08/18
229
0
hadoop2.2原理: 序列化浅析

序列化是指将一个对象编码成字节流,之后从字节流中重构对象; 为什么需要序列化? 答:用序列化接口可以将对象实例从存储到本地文件或者传送到网络的另一端的节点上; 序列化过程: 序列化的...

蓝狐乐队
2014/04/22
139
0
第一章《数据之路,始于Hadoop》第一节 Hadoop 基础介绍

第一节 hadoop介绍 大数据时代最重要的论文是Google三篇,分布讲述了GFS、MapReduce和BigTable。而依照前两篇论文设计的开源项目Hadoop,则迅速风靡,成为了大数据行业的事实标准。Google的论...

王二铁
2016/04/21
199
0
大数据Hadoop:一把杀鸡用的牛刀

Hadoop是个庞大的重型解决方案,它的设计目标本来就是大规模甚至超大规模的集群,面对的是上百甚至上千个节点,这样就会带来两个问题: 自动化管理管任务分配机制:这样规模的集群,显然不大...

风火数据
2018/06/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何编写高质量的 JS 函数(1) -- 敲山震虎篇

本文首发于 vivo互联网技术 微信公众号 链接:https://mp.weixin.qq.com/s/7lCK9cHmunvYlbm7Xi7JxQ 作者:杨昆 一千个读者,有一千个哈姆雷特。 此系列文章将会从函数的执行机制、鲁棒性、函...

vivo互联网技术
44分钟前
5
0
学会这5个Excel技巧,让你拒绝加班

在网上,随处都可以看到Excel技巧,估计已看腻了吧?但下面5个Excel技巧会让你相见恨晚。关键的是它们个个还很实用 图一 技巧1:快速删除边框 有时当我们处理数据需要去掉边框,按Ctrl+Shif...

干货趣分享
今天
11
0
JS基础-该如何理解原型、原型链?

JS的原型、原型链一直是比较难理解的内容,不少初学者甚至有一定经验的老鸟都不一定能完全说清楚,更多的"很可能"是一知半解,而这部分内容又是JS的核心内容,想要技术进阶的话肯定不能对这个...

OBKoro1
今天
9
0
高防CDN的出现是为了解决网站的哪些问题?

高防CDN是为了更好的服务网络而出现的,是通过高防DNS来实现的。高防CDN是通过智能化的系统判断来路,再反馈给用户,可以减轻用户使用过程的复杂程度。通过智能DNS解析,能让网站访问者连接到...

云漫网络Ruan
今天
15
0
OSChina 周一乱弹 —— 熟悉的味道,难道这就是恋爱的感觉

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @xiaoshiyue :好久没分享歌了分享张碧晨的单曲《今后我与自己流浪》 《今后我与自己流浪》- 张碧晨 手机党少年们想听歌,请使劲儿戳(这里)...

小小编辑
今天
3.3K
25

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部