Hive Metastore客户端自动重连机制源码解析

原创
10/29 20:12
阅读数 1.5K

[toc]

前言

​ 本文基于Hive2.1.0的Apache社区版,目的是为了探究Metastore和底层RDBMS和底层服务变更(例如版本升级、服务迁移等运维操作)对客户端和用户的影响。Hive提供了在客户端对Metastore连接超时自动重连的容错机制,允许我们通过调整参数配置调整停服时间限制,在规定时间内重启服务对用户无显著影响。由于Metastore底层RDBMS我们采用的是业内通用的Mysql,因此后面以Mysql来替代RDBMS进行描述和验证

相关参数

参数 默认值 说明 配置范围
hive.metastore.connect.retries 3 客户端建立与metastore连接时的重试次数 Metastore客户端,如CLI、Hiveserver2等
hive.metastore.failure.retries 1 客户端访问metastore的失败重试次数 Metastore客户端,如CLI、Hiveserver2等
hive.metastore.client.connect.retry.delay 1s Metastore客户端重连/重试等待的时间 Metastore客户端,如CLI、Hiveserver2等
hive.metastore.client.socket.timeout 600s Metastore客户端socket超时时间,传递给底层Socket,超时之后底层Socket会自动断开 Metastore客户端,如CLI、Hiveserver2等
hive.metastore.client.socket.lifetime 0 socket存活时间,超时之后客户端在下一次访问Metastore时会主动断开现有连接并重新建立连接,0表示不主动断开 Metastore客户端,如CLI、Hiveserver2等
hive.hmshandler.retry.attempts 10 在JDO数据存储出现错误后尝试连接的次数 Metastore
hive.hmshandler.retry.interval 2000ms JDO连接尝试间隔,单位:ms Metastore
hive.server2.thrift.client.connect.retry.limit 1 客户端建立与Hiveserver2连接的重试次数 Hiveserver2的客户端,如Beeline等
hive.server2.thrift.client.retry.limit 1 客户端访问Hiveserver2的失败重试次数 Hiveserver2的客户端,如Beeline等
hive.server2.thrift.client.retry.delay.seconds 1s Hiveserver2客户端重连/重试等待的时间 Hiveserver2的客户端,如Beeline等

hive.metastore.connect.retries 和 hive.metastore.failure.retries的区别

​ 为了弄清这两个参数的区别,让我们通过源码来确认一下,ps:为了方便阅读后面会用......省略掉无关的代码逻辑

1. Hive与Metastore交互

​ CLI和Hiveserver2都是通过org.apache.hadoop.hive.ql.metadata.Hive类与Metastore的交互的。首先让我们以createDatabase(Database, boolean)方法为例来看看具体的交互过程

   /**
   * Create a database
   * @param db
   * @param ifNotExist if true, will ignore AlreadyExistsException exception
   * @throws AlreadyExistsException
   * @throws HiveException
   */
  public void createDatabase(Database db, boolean ifNotExist)
      throws AlreadyExistsException, HiveException {
    try {
      getMSC().createDatabase(db);
    } catch (AlreadyExistsException e) {
      if (!ifNotExist) {
        throw e;
      }
    } catch (Exception e) {
      throw new HiveException(e);
    }
  }
  /**
   * @return the metastore client for the current thread
   * @throws MetaException
   */
  @LimitedPrivate(value = {"Hive"})
  @Unstable
  public synchronized IMetaStoreClient getMSC(
      boolean allowEmbedded, boolean forceCreate) throws MetaException {
    if (metaStoreClient == null || forceCreate) {
      ......
      try {
        metaStoreClient = createMetaStoreClient(allowEmbedded);
      } catch (RuntimeException ex) {
        ......
      }
      ......
    }
    return metaStoreClient;
  }

​ Hive类维护了一个IMetaStoreClient对象,通过getMSC()方法获取,getMSC()方法在这里采用了懒汉模式去创建,接下来看下Hive是如何创建一个IMetaStoreClient对象的

2. 创建一个IMetaStoreClient对象

// org.apache.hadoop.hive.ql.metadata.Hive.java
  private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException {
    ......
    if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
      return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded);
    } else {
      return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
          SessionHiveMetaStoreClient.class.getName(), allowEmbedded);
    }
  }

​ if后面的分支用于创建客户端内置的本地Metastore,这主要用于开发调试阶段,因此我们只关注else后面的逻辑,即通过RetryingMetaStoreClient.getProxy方法创建一个IMetaStoreClient对象。RetryingMetaStoreClient.getProxy方法通过几次简单地调用重载函数,最终来到下面的方法

// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java
  public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes,
      Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
      String mscClassName) throws MetaException {

    @SuppressWarnings("unchecked")
    Class<? extends IMetaStoreClient> baseClass =
        (Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName);

    RetryingMetaStoreClient handler =
        new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs,
            metaCallTimeMap, baseClass);
    return (IMetaStoreClient) Proxy.newProxyInstance(
        RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler);
  }

​ 可以看到,这里利用Java代理机制创建并返回了一个IMetaStoreClient的代理——RetryingMetaStoreClient,此后对IMetaStoreClient对象的调用都委托给RetryingMetaStoreClient.invoke 处理,接下来让我们看下RetryingMetaStoreClient.invoke方法是如何处理用户对IMetastoreClient对象的操作的

3. 每次调用IMetaStoreClient对象访问Metastore时的底层实现逻辑

// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Object ret = null;
    int retriesMade = 0;
    TException caughtException = null;
    while (true) {
      try {
        reloginExpiringKeytabUser();
        // 1. 检查是否重连,重连的场景包括:
        // a) 上一次循环访问Metastore异常,且异常类型支持自动重试访问
        // b) 底层socket超时,超时参数:hive.metastore.client.socket.lifetime
        if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
          base.reconnect();
          lastConnectionTime = System.currentTimeMillis();
        }
        if (metaCallTimeMap == null) {
          ret = method.invoke(base, args);
        } else {
          // need to capture the timing
          long startTime = System.currentTimeMillis();
          ret = method.invoke(base, args);
          long timeTaken = System.currentTimeMillis() - startTime;
          addMethodTime(method, timeTaken);
        }
        // 2. 访问Metastore正常,返回结果给上层调用并结束循环,用户不主动结束的情况下底层与Metastore的连接持续保持着
        break;
        
        // 3. 处理访问Metastore过程中出现的异常,异常主要分三类:
        // a) 用户操作异常或元数据异常,将异常抛给用户处理并结束循环
        // b) 底层连接异常,例如网络问题、Metastore服务异常(停服、连接超限等)等支持自动重连,进入步骤4
        // c) 其他未知异常,抛给用户处理并结束循环
      } catch (UndeclaredThrowableException e) {
        throw e.getCause();
      } catch (InvocationTargetException e) {
        Throwable t = e.getCause();
        if (t instanceof TApplicationException) {
          TApplicationException tae = (TApplicationException)t;
          switch (tae.getType()) {
          case TApplicationException.UNSUPPORTED_CLIENT_TYPE:
          case TApplicationException.UNKNOWN_METHOD:
          case TApplicationException.WRONG_METHOD_NAME:
          case TApplicationException.INVALID_PROTOCOL:
            throw t;
          default:
            caughtException = tae;
          }
        } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) {
          caughtException = (TException)t;
        } else if ((t instanceof MetaException) && t.getMessage().matches(
            "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") &&
            !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
          caughtException = (MetaException)t;
        } else {
          throw t;
        }
      } catch (MetaException e) {
        if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") &&
            !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
          caughtException = e;
        } else {
          throw e;
        }
      }

      // 4. 对于支持自动重试的异常,会记录重试次数并验证次数是否超限,是则返回异常并结束循环,否则将以warn形式输出异常日志提醒并等等一段时间后开始下一次循环自动重试访问Metastore。这里用到的重试次数参数和等待时间参数分别是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay
      if (retriesMade >= retryLimit) {
        throw caughtException;
      }
      retriesMade++;
      Thread.sleep(retryDelaySeconds * 1000);
    }
    return ret;
  }

  protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes,
      Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
      Class<? extends IMetaStoreClient> msClientClass) throws MetaException {

    this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES);
    this.retryDelaySeconds = hiveConf.getTimeVar(
        HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
    this.metaCallTimeMap = metaCallTimeMap;
    this.connectionLifeTimeInMillis = hiveConf.getTimeVar(
        HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS);
    ......
    this.base = (IMetaStoreClient) MetaStoreUtils.newInstance(
        msClientClass, constructorArgTypes, constructorArgs);
  }

​ 从 RetryingMetaStoreClient 的构造函数中可以发现,RetryingMetaStoreClient 维护了一个 HiveMetaStoreClient 对象,用户在上层调用一次 RetryingMetaStoreClient 对象操作,例如第一步的 createDatabase 方法,会经过 RetryingMetaStoreClient.invoke 的封装最终调用HiveMetaStoreClient类中的同名方法进行操作。在 RetryingMetaStoreClient.invoke 中封装了自动重试的逻辑,在底层与Metastore的连接过程中出现异常的情况下会自动重试而不影响上层用户的操作。

​ 这里我们在注释中标注了 invoke 方法中主要的操作步骤,可以看到,重试次数由参数hive.metastore.failure.retries控制,两次重试之间的等待时间由hive.metastore.client.connect.retry.delay控制。

​ 注意,这里我们说的是“重试”,而不是“重连”,一次重试中与Metastore的交互有两步:1. 建立与Metastore的会话 2. 执行用户请求。我们继续看下客户端是怎么建立与Metastore的会话的

4. Metastore重连

// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java
  @Override
  public void reconnect() throws MetaException {
      ......
      close();
      // 当配置了多个Metastore时,会随机调整Metastore顺序
      promoteRandomMetaStoreURI();
      open();
  }


  private void open() throws MetaException {
    isConnected = false;
    ......
    // hive.metastore.client.socket.timeout
    int clientSocketTimeout = (int) conf.getTimeVar(
        ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);

    for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
      for (URI store : metastoreUris) {
        try {
          transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
          ......
          try {
            transport.open();
            isConnected = true;
          } catch (TTransportException e) {
            ......
          }
          ......
        } catch (MetaException e) {
          ......
        }
        if (isConnected) {
          break;
        }
      }
      // Wait before launching the next round of connection retries.
      if (!isConnected && retryDelaySeconds > 0) {
        try {
          Thread.sleep(retryDelaySeconds * 1000);
        } catch (InterruptedException ignore) {}
      }
    }

    if (!isConnected) {
      throw new MetaException("Could not connect to meta store using any of the URIs provided." +
        " Most recent failure: " + StringUtils.stringifyException(tte));
    }
    ......
  }

  public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded)
    throws MetaException {
    ......
    // hive.metastore.connect.retries
    retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
    // hive.metastore.client.connect.retry.delay
    retryDelaySeconds = conf.getTimeVar(
        ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
    ......
    // 初始化一个HiveMetaStoreClient对象时会尝试建立与Metastore的长会话
    open();
  }

​ 同上一步的重试逻辑类似,与Metastore的连接支持自动重连,由 hive.metastore.connect.retries 控制重连次数,hive.metastore.client.connect.retry.delay 控制重连等待时间,底层利用Thrift提供的RPC通信服务。

​ 如果配置了多个Metastore地址,每一次重连的时候会按顺序遍历所有的Metastore并尝试与之建立会话,直到有一个会话建立成功为止。

​ 此外,初始化一个HiveMetaStoreClient对象时会调用open()方法尝试建立一个与Metastore的长会话,供后面的用户请求使用

总结

  1. HiveMetaStoreClient.open() 方法建立一个与Metastore的会话,该方法中会在连接失败的情况下自动重连,重连次数、重连等待时间分别由参数 hive.metastore.connect.retrieshive.metastore.client.connect.retry.delay 控制。且每次重连时会遍历用户配置的所有的Metastore直到成功建立一个会话
  2. 用户新建一个Metastore客户端(例如启动一个CLI、Hiveserver2进程)时,会初始化并维护一个IMetaStoreClient对象,在初始化时调用 *HiveMetaStoreClient.open()*方法建立一个与Metastore的长会话
  3. 用户每次调用IMetaStoreClient中的方法进行业务操作,实际上委托给 RetryingMetaStoreClient.invoke 方法操作,在遇到与Metastore连接等异常时会进行自动重试,重试次数、重试等待时间分别由参数 hive.metastore.failure.retrieshive.metastore.client.connect.retry.delay 控制
  4. RetryingMetaStoreClient.invoke 中每次重试会尝试调用 HiveMetaStoreClient.reconnect() 方法重连Metastore,HiveMetaStoreClient.reconnect() 方法内会调用 HiveMetaStoreClient.open() 去连接Metastore。因此,invoke方法实际上在重试循环中嵌套了循环重连Metastore的操作
  5. 所以 hive.metastore.failure.retries 参数实际上仅用于在已经建立了Metastore的会话的基础上进行正常的业务访问过程中遇到连接异常等问题时的重试次数限制,而 hive.metastore.connect.retries 则是更底层自动重连Metastore的次数限制
  6. 此外,hive.server2.thrift.client.connect.retry.limit 同 hive.server2.thrift.client.retry.limit 的区别也与hive.metastore.connect.retries 和 hive.metastore.failure.retries的区别类似,这里就不再赘述,有兴趣的同学可以参照本篇文档去研究下源码

结论

  • 仅停止Mysql服务
    • Metastore重试总时间 = hive.hmshandler.retry.attempts * hive.hmshandler.retry.interval
    • CLI、Hiveserver2会报JDO相关的异常,并断开与Metastore的连接
  • 当CLI、Hiveserver2与Metastore的连接无响应时间超过hive.metastore.client.socket.timeout值会自动断开连接
  • 仅停止metastore服务,CLI、Hiveserver2会打印“Failed to connect to the MetaStore Server”及重连失败的异常,但会在多次重连仍然失败后才退出
    • 已经与Metastore建立会话的情况下,客户端的每一次业务请求的重试总时间 = hive.metastore.connect.retries * hive.metastore.failure.retries * hive.metastore.client.connect.retry.delay
    • 停服期间客户端新建一个Metastore连接过程中重试总时间间隔 = hive.metastore.connect.retries * hive.metastore.client.connect.retry.delay
    • 关联Beeline会卡住,直到Hiveserver2走完一个完整的重连Metastore周期后放弃连接Metastore为止,此时Hiveserver2会返回异常
  • 仅关闭Hiveserver2服务,Beeline直接报错,不会重试,需要手动重连
  • 以上各组件除Beeline外均可在上游服务恢复后自动恢复,通过自动重连机制实现
  • 综上,建议进行底层服务变更(Metastore或MySQL)时停止Metastore服务,在停服之前需提前配置好Metastore客户端超时重连的相关参数,预留适当的变更时间
展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部