YARN任务运行中的Token

原创
2022/05/24 06:41
阅读数 205

上一篇文章中,主要讲解了token的一些通用知识,以及hadoop中,token的实现和通用数据结构及流程。

本文主要讲述yarn任务提交运行过程中涉及的几个重要token:AMRMToken,NMToken,ContainerToken。


【AMRMToken】


用于保证ApplicationMaster(下面均简称AM)与RM之间的安全通信,即AM向RM注册,以及后续向RM申请资源的rpc请求,都会带上该token。


AMRMToken在客户端向RM提交任务后,由RM创建生成然后通过rpc请求传递给NM;NM通过将token持久化到本地文件,让AM启动后从对应文件中加载到token,这样AM就可以使用正确的token向RM注册并完成rpc请求交互了。接下来就展开说明下。


1)token的生成

客户端提交任务请求后,RM在内部的处理中,为AM构造对应的container启动上下文时,创建了AMRMToken,相关代码如下所示:

// AMLauncher.java
private void launch() throws IOException, YarnException {
    ...
    // 构造 container 启动上下文
    ContainerLaunchContext launchContext =
        createAMContainerLaunchContext(applicationContext, masterContainerID);
    ...
}

private ContainerLaunchContext createAMContainerLaunchContext(
    ApplicationSubmissionContext applicationMasterContext,
    ContainerId containerID)
 throws IOException
{
    ...
    setupTokens(container, containerID);
    ...
}

protected void setupTokens(ContainerLaunchContext container, ContainerId, containerID) 
    throws IOException
{
    ...
    // 构造 AMRMToken
    Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
    if (amrmToken != null) {
        credentials.addToken(amrmToken.getService(), amrmToken);
    }
    ...
}

protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
    Token<AMRMTokenIdentifier> amrmToken =
        this.rmContext.getAMRMTokenSecretManager()
            .createAndGetAMRMToken(application.getAppAttemptId());
    ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
    return amrmToken;
}


2)AMRMToken的传递

a. RM --> NM

在构造完container启动上下文后,将启动上下文随container启动请求(StartContainerRequest)发送给NM。


b. NM --> AM

NM收到请求后,内部构造Container实例对象,并从请求中取出credential保存在实例对象中,在真正需要启动AM时,将token信息写到本地文件中

// ContainerLauncher.java
public Integer call() {
    // token存储在 nmPrivate 中的路径
    Path nmPrivateTokensPath =
        dirsHandler.getLocalPathForWrite(
            getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR +
            String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
    // Set th token location too.
    // 为AM设置环境变量
    addToEnvMap(
        environment, nmEnvVars,
        ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
        new Path(
            containerWorkDir,
            FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
    // 将token写入文件中
    try (DataOutputStream tokensOutStream =
        lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
        Credentials creds = container.getCredentials();
        creds.writeTokenStorageToStream(tokensOutStream);
    }
    ...
}

//DefaultContainerExecutor.java
public int launchContainer(ContainerStartContext ctx) {
    // copy container tokens to work dir
    Path tokenDst =
        new Path(containerWorkDir, Containerlaunch.FINAL_CONTAINER_TOKENS_FILE);
    copyFile(nmPrivateTokensPath, tokenDst, user);
}


从上面的代码可以看到,实际上先将token写入nmPrivate目录中,以container的ID作为文件名,".tokens"作为文件后缀,然后将token文件拷贝到container的工作目录中,并重命名为container.tokens


例如,存储在nmPrivate目录下的token:

[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# pwd
/home/hncscwc/hadoop/yarn/nodemanager/local/nmPrivate/application_1652243949356_2011/container_e301_1652243949356_2011_01_000003
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# ll
total 52
-rw-r--r-- 1 hadoop hadoop 8 May 13 16:27 container_e301_1652243949356_2011_01_000003.pid
-rw-r--r-- 1 hadoop hadoop 387 May 13 16:27 container_e301_1652243949356_2011_01_000003.tokens
-rw-r--r-- 1 hadoop hadoop 43441 May 13 16:27 launch_container.sh


存储在container工作目录下的token:

[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# pwd
/home/hncscwc/hadoop/yarn/nodemanager/local/usercache/dcp/appcache/application_1652243949356_2011/container_e301_1652243949356_2011_01_000003
[root@dn-nm-0 container_e301_1652243949356_2011_01_000003]# ll
total 72
lrwxrwxrwx 1 hadoop hadoop 100 May 13 16:27 __app__.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3906/spark-examples_2.11-2.4.4.jar
-rw-r--r-- 1 hadoop hadoop 387 May 13 16:27 container_tokens
-rwx------ 1 hadoop hadoop 750 May 13 16:27 default_container_executor_session.sh
-rwx------ 1 hadoop hadoop 805 May 13 16:27 default_container_executor.sh
-rwx------ 1 hadoop hadoop 43441 May 13 16:27 launch_container.sh
lrwxrwxrwx 1 hadoop hadoop 91 May 13 16:27 metrics-influxdb.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3910/metrics-influxdb.jar
lrwxrwxrwx 1 hadoop hadoop 89 May 13 16:27 metrics.properties -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3909/metrics.properties
lrwxrwxrwx 1 hadoop hadoop 89 May 13 16:27 __spark_conf__ -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3908/__spark_conf__.zip
lrwxrwxrwx 1 hadoop hadoop 94 May 13 16:27 spark-influxdb-sink.jar -> /home/hncscwc/hadoop/yarn/nodemanager/local/usercache/hncscwc/filecache/3907/spark-influxdb-sink.jar
drwxr-xr-x 2 hadoop hadoop 12288 May 13 16:27 __spark_libs__
drwx--x--- 2 hadoop hadoop 55 May 13 16:27 tmp


3)AM启动后的注册校验

AM进程启动后,构造UGI(UserGroupInformation)对象时,会根据环境变量HADOOP_TOKEN_FILE_LOCATION的值,从指定文件中加载token信息,然后附在rpc请求中向RM进行注册。RM收到请求后由对应的SecretManager(这里对应于AMRMTokenSecretManager)完成认证逻辑。认证的逻辑在上一篇文章有详细介绍。


需要注意的是:CONTAINER_TOKEN_FLIE_ENV_NAME的值与HADOOP_TOKEN_FILE_LOCATION的值是相同的,这样就可以保证正确读取到对应的token。


【NMToken】


NMToken则是用于与NM的安全通信。

从任务提交运行的流程中可以知道,RM和AM都会和NM通信请求启动container,其中RM向NM请求启动AM;而AM则是向NM请求启动任务container。因此,在RM与NM的通信、AM与NM的通信中都会用到NMToken。


1) NM向RM注册获取NMToken的MasterKey

由于NMToken是由RM生成的,但最终在NM中进行校验,因此NM需要和RM使用一样的密钥,这个密钥是在NM向RM注册时获取的,并在心跳请求中更新密钥信息。

// ResourceTrackerService.java
public RegisterNodeManagerResponse registerNodeManager(
    egisterNodeManagerRequest request)
 throws YarnException, IOException
{
    ...
    RegisterNodeManagerResponse response =
    recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
    ...
    // 返回 containerToken 和 NMToken 的密钥信息
    response.setContainerTokenMasterKey(
        containerTokenSecretManager.getCurrentKey());
    response.setNMTokenMasterKey(
        nmTokenSecretManager.getCurrentKey());
}

// NodeStatusUpdaterImpl.java
protected void registerWithRM() 
    throws YarnException, IOException
{
    ...
    regNMResponse =
        resourceTracker.registerNodeManager(request);
    MasterKey masterKey =
        regNMResponse.getContainerTokenMasterKey();
    // do this now so that its set before we start heartbeating to RM
    // It is expected that status updater is started by this point and
    // RM gives the shared secret in registration during
    // StatusUpdater#start().
    if (masterKey != null) {
       this.context.getContainerTokenSecretManager()
           .setMasterKey(masterKey);
    }

    masterKey = regNMResponse.getNMTokenMasterKey();
    if (masterKey != null) {
        this.context.getNMTokenSecretManager()
            .setMasterKey(masterKey);
    }
}


2)RM向NM请求启动AM

在请求中会携带NMToken:

// AMLauncher.java 
private void launch() throws IOException, YarnException {
    connect();
    ...
}

private void connect() throws IOException {
    ContainerId masterContainerID = masterContainer.getId();
    
    containerMgrProxy = getContainerMgrProxy(masterContainerID);
}

protected ContainerManagementProtocol getContainerMgrProxy(
    final ContainerId containerId)
 
{
    final NodeId node = masterContainer.getNodeId();
    final InetSocketAddress containerManagerConnectAddress =
        NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());

    final YarnRPC rpc = getYarnRPC();

    UserGroupInformation currentUser =
        UserGroupInformation.createRemoteUser(containerId
            .getApplicationAttemptId().toString());

    String user =
        rmContext.getRMApps()
            .get(containerId.getApplicationAttemptId().getApplicationId())
            .getUser();
    org.apache.hadoop.yarn.api.records.Token token =
        rmContext.getNMTokenSecretManager().createNMToken(
            containerId.getApplicationAttemptId(), node, user);
    currentUser.addToken(ConverterUtils.convertFromYarn(token,
        containerManagerConnectAddress));

    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
        currentUser, rpc, containerManagerConnectAddress);
}


NM在请求处理中校验:

// ContainerManagerImpl.java
public StartContainersResponse startContainers(
    StartContainersRequest requests)
 throws YarnException, IOException
{
    UserGroupInformation remoteUgi = getRemoteUgi();
    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
    authorizeUser(remoteUgi, nmTokenIdentifier);
    ...
}

protected NMTokenIdentifier selectNMTokenIdentifier(
    UserGroupInformation remoteUgi)
 
{
    Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
    NMTokenIdentifier resultId = null;
    for (TokenIdentifier id : tokenIdentifiers) {
        if (id instanceof NMTokenIdentifier) {
            resultId = (NMTokenIdentifier) id;
            break;
        }
    }
    return resultId;
}

protected void authorizeUser(UserGroupInformation remoteUgi,
    NMTokenIdentifier nmTokenIdentifier)
 throws YarnException
{
    if (nmTokenIdentifier == null) {
      throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
    }
    if (!remoteUgi.getUserName().equals(
      nmTokenIdentifier.getApplicationAttemptId().toString())) {
      throw RPCUtil.getRemoteException("Expected applicationAttemptId: "
          + remoteUgi.getUserName() + "Found: "
          + nmTokenIdentifier.getApplicationAttemptId());
    }
}


3)AM启动向RM注册后,从注册的响应中获取NMToken

// AMRMClientImpl.java
private RegisterApplicationMasterResponse registerApplicationMaster()
      throws YarnException, IOException
{
    RegisterApplicationMasterRequest request =
        RegisterApplicationMasterRequest.newInstance(this.appHostName,
            this.appHostPort, this.appTrackingUrl);
    RegisterApplicationMasterResponse response =
        rmClient.registerApplicationMaster(request);
    synchronized (this) {
      lastResponseId = 0;
      if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
        populateNMTokens(response.getNMTokensFromPreviousAttempts());
      }
    }
    return response;
}

// 将Token放到缓存中
protected void populateNMTokens(List<NMToken> nmTokens) {
    for (NMToken token : nmTokens) {
      String nodeId = token.getNodeId().toString();
      if (LOG.isDebugEnabled()) {
        if (getNMTokenCache().containsToken(nodeId)) {
          LOG.debug("Replacing token for : " + nodeId);
        } else {
          LOG.debug("Received new token for : " + nodeId);
        }
      }
      getNMTokenCache().setToken(nodeId, token.getToken());
    }
}


4)AM向NM请求启动任务container时,将token放到ugi中

从缓存中取出对应NM节点的的token,然后放到ugi中,随请求一并发送给NM

// NMClientImpl.java
public Map<String, ByteBuffer> startContainer(
    Container container, ContainerLaunchContext containerLaunchContext)

        throws YarnException, IOException
{
    ...
    proxy =
      cmProxy.getProxy(container.getNodeId().toString(),
        container.getId());
    // 注意containerToken
    StartContainerRequest scRequest =
      StartContainerRequest.newInstance(containerLaunchContext,
        container.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
    StartContainersRequest allRequests =
      StartContainersRequest.newInstance(list);
    StartContainersResponse response =
      proxy.getContainerManagementProtocol().startContainers(allRequests);
    ...
}

// ContainerManagementProtocolProxy.java
public synchronized ContainerManagementProtocolProxyData getProxy(
    String containerManagerBindAddr, ContainerId containerId)

    throws InvalidToken
{
    ...
    if (proxy == null) {
      proxy =
          new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
              containerId, nmTokenCache.getToken(containerManagerBindAddr));
      if (maxConnectedNMs > 0) {
        addProxyToCache(containerManagerBindAddr, proxy);
      }
    }
    ...
}

public ContainerManagementProtocolProxyData(YarnRPC rpc,
    String containerManagerBindAddr,
    ContainerId containerId, Token token)
 throws InvalidToken
{
    this.containerManagerBindAddr = containerManagerBindAddr;
    this.activeCallers = 0;
    this.scheduledForClose = false;
    this.token = token;
    this.proxy = newProxy(rpc, containerManagerBindAddr, containerId, token);
}

protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
    String containerManagerBindAddr, ContainerId containerId, Token token)

    throws InvalidToken
{
    UserGroupInformation user =
      UserGroupInformation.createRemoteUser(containerId
        .getApplicationAttemptId().toString());

    org.apache.hadoop.security.token.Token<NMTokenIdentifier> nmToken =
      ConverterUtils.convertFromYarn(token, cmAddr);
    user.addToken(nmToken);

    return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
        user, rpc, cmAddr);
}


【ContainerToken】


在向NM请求启动container时,除了需要NMToken之外,还需要ContainerToken,以验证container的合法性。


ContainerToken和NMToken采用相同的方式,因此密钥的获取方式与流程以及更新,和前面NMToken中讲到的几乎是同一个流程。


首先,同样是在NM的注册与定时心跳请求中,RM向NM同步并更新密钥。RM向NM请求container时,直接创建并带上ContainerToken;而AM则是通过向RM申请资源时,RM创建了ContainerToken,并随请求的应答传递给了AM。此后AM再向NM请求启动container时,则带上了对应的Token信息,有兴趣的朋友可以对照流程走读相关源码。


另外,该token大的类型虽然都是containerToken,但实际上又细分为ApplicaitonMaster和Task两类,分别用于RM与NM通信、AM与NM通信中。


【LocalizerToken】


LocalizerToken主要用于NM的资源本地化服务与NM之间的通信。由于NM资源本地化服务是以一个独立进程的方式运行的,并且会通过rpc协议不断向NM汇报资源下载情况,因此使用Token来保证通信安全。


【总结】


小结一下,本文主要讲解了Yarn运行中涉及的几个token,具体包括token的作用,如何创建,具体使用的流程。


另外,除了上面介绍的几个token之外,各个任务(mr/spark/flink)在运行时,也还存在一些其他的token,例如mr中会用到的ClientToAMToken等,有兴趣的可以自行摸索下~


好了,这就是本文的全部内容,如果觉得本文对您有帮助,请点赞在看转发,也欢迎加我微信交流~

本文分享自微信公众号 - hncscwc(gh_383bc7486c1a)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部