体会 Hadoop 数据节点升级机制中的设计之美

原创
2017/02/06 11:17
阅读数 226

一、前言

Hadoop数据节点的升级机制,深入了解下发现设计非常的优美,在此分享给大家。升级机制最重要的部分就是升级过程中的故障恢复。我们来看下它是怎么被解决的。

关键点:

  1. 升级过程生成临时目录,标识中间状态
  2. 启动时分析出当前数据节点的存储空间状态
  3. 根据存储空间状态执行相应的操作使数据节点恢复正常

接下来我们按关键点分析,看看它到底是怎么实现的。其中涉及到不少 Hadoop 源代码的分析,关键代码我会贴出来,如果大家想看完整的可以自行下载浏览。

 

二、数据节点的升级

数据节点的升级包含三个非常重要的步骤:​​​​​​升级、升级提交和升级回滚。我们先看下他们之间的联系。

  • 升级

下面代码在 Hadoop 1.0.0 版本下的 DataStorage.java 文件中。文件路径为:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\datanode。

void doUpgrade(StorageDirectory sd,
                 NamespaceInfo nsInfo
                 ) throws IOException {
    ...
    // enable hardlink stats via hardLink object instance
    HardLink hardLink = new HardLink();
    
    File curDir = sd.getCurrentDir();
    File prevDir = sd.getPreviousDir();
    assert curDir.exists() : "Current directory must exist.";
    // delete previous dir before upgrading
    if (prevDir.exists())
      deleteDir(prevDir);
    File tmpDir = sd.getPreviousTmp();
    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
    // rename current to tmp
    rename(curDir, tmpDir);
    // hardlink blocks
    linkBlocks(tmpDir, curDir, this.getLayoutVersion(), hardLink);
    // write version file
    this.layoutVersion = FSConstants.LAYOUT_VERSION;
    assert this.namespaceID == nsInfo.getNamespaceID() :
      "Data-node and name-node layout versions must be the same.";
    this.cTime = nsInfo.getCTime();
    sd.write();
    // rename tmp to previous
    rename(tmpDir, prevDir);
    ...
  }

上面的代码并不是特别难。我们看最关键的几个步骤:

  1. 将数据节点当前数据的存储目录(current) 改名为临时目录(previous.tmp)
  2. 生成新的 current 目录,并将当前目录下所有文件使用硬链接的模式链接到 previous.tmp 目录中的文件
  3. 将 VERSION 文件写入到 current 目录
  4. 将临时目录(previous.tmp)改名为 previous 目录

通过以上几个步骤我们发现在升级过程中做的操作并不复杂,我们关键要注意临时目录(previous.tmp),因为它是后续判断升级是否异常的关键依据。

  • 升级提交

我们再来看下升级提交的代码,代码仍然在 DataStorage.java 文件中。

void doFinalize(StorageDirectory sd) throws IOException {
    File prevDir = sd.getPreviousDir();
    if (!prevDir.exists())
      return; // already discarded
    final String dataDirPath = sd.getRoot().getCanonicalPath();
    ...
    assert sd.getCurrentDir().exists() : "Current directory must exist.";
    final File tmpDir = sd.getFinalizedTmp();
    // rename previous to tmp
    rename(prevDir, tmpDir);

    // delete tmp dir in a separate thread
    new Daemon(new Runnable() {
        public void run() {
          try {
            deleteDir(tmpDir);
          } catch(IOException ex) {
            LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
          }
          LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
        }
        public String toString() { return "Finalize " + dataDirPath; }
      }).start();
  }

通过代码发现,升级提交流程非常的简单,只有关键的两步:

  1. 将 previous 目录重命名为 临时目录(finalized.tmp)
  2. 将临时目录(finalized.tmp)利用单独线程删除

通过上面我们发现即使是简单的将一个目录删除 Hadoop 也分了两步,并且还需要一个临时目录作为中间状态

  • 升级回滚

最后我们再来看下升级回滚的代码,同样在 DataStorage.java 文件中。

void doRollback( StorageDirectory sd,
                   NamespaceInfo nsInfo
                   ) throws IOException {
    File prevDir = sd.getPreviousDir();
    // regular startup if previous dir does not exist
    if (!prevDir.exists())
      return;
    DataStorage prevInfo = new DataStorage();
    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
    prevSD.read(prevSD.getPreviousVersionFile());

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
          && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
      throw new InconsistentFSStateException(prevSD.getRoot(),
                                             "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
                                             + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
                                             + " is newer than the namespace state: LV = "
                                             + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
    ...
    File tmpDir = sd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // rename current to tmp
    File curDir = sd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    // rename previous to current
    rename(prevDir, curDir);
    // delete tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
  }

同样我们来分析下上面的代码,找出其中的关键步骤:

  1. 根据版本信息判断是否可以进行回滚操作
  2. 将当前目录(current)重命名为临时目录(removed.tmp)
  3. 将之前一个版本的目录(previous)重命名为当前目录(current)
  4. 删除临时目录(removed.tmp)

我们发现回滚操作也利用了临时文件夹(removed.tmp)。

通过上面的分析我们已经清晰的知道数据节点升级的三个关键操作。接下来我们继续了解当在升级过程中发生异常时,数据节点又是如何处理的。

 

三、分析存储空间状态

通过上面的分析我们发现每个步骤都会生成像 "previous.tmp、removed.tmp"这样的临时目录,它们的作用是什么呢?

我们发现在执行升级、回滚等操作时都需要进行一定的操作,如果在做这些操作的时候设备出现故障(如断电)那么存储空间就会处于一个中间状态。引入上述的这些临时目录就能判断异常发生在什么操作的什么状态,这样就会方便后续的故障恢复。

数据节点在启动的时候会对当前节点的存储空间进行分析,得出存储空间的状态,然后根据不同的状态执行不同的操作。如果发现分析出的状态不是正常状态,存在中间状态或异常状态(例如:发现升级过程中的临时目录),则启动 recovery 进行恢复。

我们看下这部分的关键代码,这部分代码仍然在 DataStorage.java 文件中。

void recoverTransitionRead(NamespaceInfo nsInfo,
                             Collection<File> dataDirs,
                             StartupOption startOpt
                             ) throws IOException {
    ...
    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
      File dataDir = it.next();
      StorageDirectory sd = new StorageDirectory(dataDir);
      StorageState curState;
      try {
        curState = sd.analyzeStorage(startOpt);      // 分析当前存储状态
        // sd is locked but not opened
        switch(curState) {
        case NORMAL:
          break;
        case NON_EXISTENT:
          // ignore this storage
          LOG.info("Storage directory " + dataDir + " does not exist.");
          it.remove();
          continue;
        case NOT_FORMATTED: // format
          LOG.info("Storage directory " + dataDir + " is not formatted.");
          LOG.info("Formatting ...");
          format(sd, nsInfo);
          break;
        default:  // recovery part is common
          sd.doRecover(curState);                    // 发现中间或异常状态,进行恢复
        }
      } catch (IOException ioe) {
    ...
  }

明白了这个流程,接下来我们就深入了解下 analyzeStorage 的具体内容和总共有多少种状态。

  • 存储空间状态

通过代码我们发现存储空间的状态总共有以下几种:

 public enum StorageState {
    NON_EXISTENT,
    NOT_FORMATTED,
    COMPLETE_UPGRADE,
    RECOVER_UPGRADE,
    COMPLETE_FINALIZE,
    COMPLETE_ROLLBACK,
    RECOVER_ROLLBACK,
    COMPLETE_CHECKPOINT,
    RECOVER_CHECKPOINT,
    NORMAL;
  }
  ...
  // Startup options
  static public enum StartupOption{
    FORMAT  ("-format"),   //格式化系统
    REGULAR ("-regular"),  //正常启动HDFS
    UPGRADE ("-upgrade"),  //升级系统
    ROLLBACK("-rollback"), //从升级中回滚到前一个版本
    FINALIZE("-finalize"), //提交一次升级
    IMPORT  ("-importCheckpoint");// 从名字节点的一个检查点恢复
    ...
  }
  

我们发现上面提到的所有存储空间状态和当前施加在存储空间的动作相关。其中只有部分是和升级相关的。

  • 分析出当前存储空间状态

接下来我们好好研究下这些状态是怎么得到的?

要回答上面我们提出的问题,就必须好看下状态分析函数:analyzeStorage。这函数在 Storage.java 文件中。路径为:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\common。

public StorageState analyzeStorage(StartupOption startOpt) throws IOException {
      ...
      // check whether current directory is valid
      File versionFile = getVersionFile();
      boolean hasCurrent = versionFile.exists();

      // check which directories exist
      boolean hasPrevious = getPreviousDir().exists();
      boolean hasPreviousTmp = getPreviousTmp().exists();
      boolean hasRemovedTmp = getRemovedTmp().exists();
      boolean hasFinalizedTmp = getFinalizedTmp().exists();
      boolean hasCheckpointTmp = getLastCheckpointTmp().exists();

      if (!(hasPreviousTmp || hasRemovedTmp
          || hasFinalizedTmp || hasCheckpointTmp)) {
        // no temp dirs - no recovery
        if (hasCurrent)
          return StorageState.NORMAL;
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                              "version file in current directory is missing.");
        return StorageState.NOT_FORMATTED;
      }

      if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
          + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
        // more than one temp dirs
        throw new InconsistentFSStateException(root,
                                               "too many temporary directories.");

      // # of temp dirs == 1 should either recover or complete a transition
      if (hasCheckpointTmp) {
        return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
                          : StorageState.RECOVER_CHECKPOINT;
      }

      if (hasFinalizedTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
                                                 + "cannot exist together.");
        return StorageState.COMPLETE_FINALIZE;
      }

      if (hasPreviousTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
                                                 + " cannot exist together.");
        if (hasCurrent)
          return StorageState.COMPLETE_UPGRADE;
        return StorageState.RECOVER_UPGRADE;
      }
      
      assert hasRemovedTmp : "hasRemovedTmp must be true";
      if (!(hasCurrent ^ hasPrevious))
        throw new InconsistentFSStateException(root,
                                               "one and only one directory " + STORAGE_DIR_CURRENT 
                                               + " or " + STORAGE_DIR_PREVIOUS 
                                               + " must be present when " + STORAGE_TMP_REMOVED
                                               + " exists.");
      if (hasCurrent)
        return StorageState.COMPLETE_ROLLBACK;
      return StorageState.RECOVER_ROLLBACK;
    }

通过上面的代码我们就可以清晰的知道每个存储空间状态是如何得到的。这里我们重点解释下 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 这两个存储空间状态。

  1. COMPLETE_UPGRADE:“previous.tmp”目录存在,同时“current”目录下的“VERSION”文件存在,可完成升级
  2. RECOVER_UPGRADE:“previous.tmp”目录存在,“current/VERSION”文件不存在,存储空间应该从升级过程中恢复

其他的状态就不做一一分析,大家根据代码应该很容易就可以得出各个状态成立的条件。

 

四、根据存储空间状态进行恢复

得到了分析后的存储空间状态,我们就可以根据不同的状态将存储系统恢复正常。我们来看下这部分代码,这部分代码 仍然在 Storage.java 文件中。

public void doRecover(StorageState curState) throws IOException {
      File curDir = getCurrentDir();
      String rootPath = root.getCanonicalPath();
      switch(curState) {
      case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
        LOG.info("Completing previous upgrade for storage directory " 
                 + rootPath + ".");
        rename(getPreviousTmp(), getPreviousDir());
        return;
      case RECOVER_UPGRADE:   // mv previous.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from previous upgrade.");
        if (curDir.exists())
          deleteDir(curDir);
        rename(getPreviousTmp(), curDir);
        return;
      case COMPLETE_ROLLBACK: // rm removed.tmp
        LOG.info("Completing previous rollback for storage directory "
                 + rootPath + ".");
        deleteDir(getRemovedTmp());
        return;
      case RECOVER_ROLLBACK:  // mv removed.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from previous rollback.");
        rename(getRemovedTmp(), curDir);
        return;
      case COMPLETE_FINALIZE: // rm finalized.tmp
        LOG.info("Completing previous finalize for storage directory "
                 + rootPath + ".");
        deleteDir(getFinalizedTmp());
        return;
      case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
        LOG.info("Completing previous checkpoint for storage directory " 
                 + rootPath + ".");
        File prevCkptDir = getPreviousCheckpoint();
        if (prevCkptDir.exists())
          deleteDir(prevCkptDir);
        rename(getLastCheckpointTmp(), prevCkptDir);
        return;
      case RECOVER_CHECKPOINT:  // mv lastcheckpoint.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from failed checkpoint.");
        if (curDir.exists())
          deleteDir(curDir);
        rename(getLastCheckpointTmp(), curDir);
        return;
      default:
        throw new IOException("Unexpected FS state: " + curState);
      }
    }

这部分代码非常容易理解,就是根据不同的状态执行不同的操作。我们仍然只分析两个关键的状态 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 对应的操作。

  1. COMPLETE_UPGRADE:将临时目录“previous.tmp”改名为“previous”
  2. RECOVER_UPGRADE:将临时目录“previous.tmp”改名为“current”

这两个状态对应的操作也很好理解,分别是完成未完成的升级操作和回退未完成的升级操作。

五、总结

以上升级机制的关键就在于在做各种升级操作的时候很好的利用临时文件夹,方便后续分析当前存储空间状态和对异常情况进行恢复。

同时也很好的利用了存储空间的状态机机制,这样很好的降低了各种状态之间的耦合性。

关键点:

  • 临时文件夹
  • 存储空间状态机
展开阅读全文
打赏
0
7 收藏
分享
加载中
更多评论
打赏
0 评论
7 收藏
0
分享
返回顶部
顶部