文档章节

HDFS下载数据之源码分析-FileSystem.get(conf)_block02

w
 wall--e
发布于 2016/04/21 12:00
字数 728
阅读 110
收藏 0

接block01

来自分割线4调用NameNodeProxies.createNNProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries)方法

private static ClientProtocol createNNProxyWithClientProtocol(InetSocketAddress address, 
    Configuration conf, UserGroupInformation ugi, boolean withRetries) throws IOException {
    // 设置一个RPC protocol, 使用非默认的RpcEngine
    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
    // 获取配置文件中默认的RetryPolicy
    final RetryPolicy defaultPolicy = 
        RetryUtils.getDefaultRetryPolicy(
            conf, 
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
            SafeModeException.class);
    // 获取ClientNamenodeProtocolPB这个RPC协议接口的versionID
    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    // 第一次获取ClientNamenodeProtocolPB的代理对象
    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
            .getProxy();
    
    if (withRetries) { // create the proxy with retries
      ...
      Map<String, RetryPolicy> methodNameToPolicyMap 
                 = new HashMap<String, RetryPolicy>();
      methodNameToPolicyMap.put("create", methodPolicy);
      // 再次创建ClientNamenodeProtocolPB的代理对象
      proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
          ClientNamenodeProtocolPB.class,
          new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
              ClientNamenodeProtocolPB.class, proxy),
          methodNameToPolicyMap,
          defaultPolicy);
    }
    // 返回一个proxy的包装对象
    return new ClientNamenodeProtocolTranslatorPB(proxy);
  }

至此, 你还记得createNNProxyWithClientProtocol()方法返回到哪里了吗?

答案是分割线4: NameNodeProxies.createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries)方法!

public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr,
    Class<T> xface, UserGroupInformation ugi, boolean withRetries) throws IOException {
    ...
    // 创建一个namenode代理对象
    proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries);
    /* 分割线4, 期待着createNNProxyWithClientProtocol()方法返回 */
    ...
    // 把proxy, dtService封装成一个ProxyAndInfo对象, 并返回
    return new ProxyAndInfo<T>(proxy, dtService);
}

至此, 你还记得createNonHAProxy()方法返回到哪里了吗? 

答案是分割线3: DFSClient类的构造函数里!

DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem$Statistics statistics)

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, 
    FileSystem.Statistics stats) throws IOException {
    ...
    proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
    /* 分割线3, 期待着createProxy()方法返回 */
    this.dtService = proxyInfo.getDelegationTokenService();    // DFSClient的成员变量dtService引用从proxyInfo中取出的dtService
    this.namenode = proxyInfo.getProxy();    // DFSClient的成员变量namenode引用从proxyInfo中取出的namenode
    ...
}

至此! DFSClient实例彻底创建完了! 你还记得这个DFSClient实例返回到哪里了吗? 

答案是分割线2: DistributedFileSystem.initialize(URI uri, Configuration conf)方法

public void initialize(URI uri, Configuration conf) throws IOException {
    ...
    // new一个DFSClient实例,成员变量dfs引用这个DFSClient实例
    this.dfs = new DFSClient(uri, conf, statistics );
    /* 分割线2, 期待着new DFSClient()返回 */
    ...
}

至此! DistributedFileSystem实例彻底创建完了! 你还记得这个DistributedFileSystem实例返回到哪里了吗? 

答案是分割线1:

FileSystem$Cache.getInternal(URI uri, Configuration conf, FileSystem$Cache$Key key)方法

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      ...
      fs = createFileSystem(uri, conf);
      /* 分割线1, 期待着createFileSystem()方法返回 */
      synchronized (this) { // refetch the lock again
        /*
         * 在多线程环境下, 可能另一个客户端(另一个线程)创建好了一个DistributedFileSystem实例, 并缓存到了map中
         * 所以, 这时候就把当前客户端新创建的DistributedFileSystem实例注销
         * 其实这是一个特殊的单例模式, 一个key映射一个DistributedFileSystem实例
         */
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }
        /*
         * now insert the new file system into the map
         * 缓存当前新创建的DistributedFileSystem实例到map中 
         */
        fs.key = key;
        map.put(key, fs);
        ...
        return fs;
      }
}

...

绕了这么一大圈, 终于返回到客户端代码中

FileSystem fs = FileSystem.get(new Configuration());
// fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_542259566_1, ugi=root (auth:SIMPLE)]]


© 著作权归作者所有

w
粉丝 7
博文 31
码字总数 25016
作品 0
东城
程序员
私信 提问
HDFS下载数据之源码分析-FileSystem.get(conf)_block01

首先来看一下, FileSystem(org.apache.hadoop.fs.FileSystem), 这是一个抽象类, 是所有文件系统的父类. 而我们要从HDFS(Hadoop Distributed FileSystem)下载数据, 应该获取一个DistributedFi...

wall--e
2016/04/21
282
0
FileSystem.get从缓存cache中获得连接导致的问题

首先了解FileSyste.get机制,查看源码可知,首先会根据fs.hdfs.impl.disable.cache,是否去缓存cache中找连接,默认是去缓存中找连接的,参考:HDFS下载数据之源码分析-FileSystem.get(conf)...

cjun1990
2016/05/03
1K
0
Spark中分布式使用HanLP(1.7.0)分词示例

HanLP分词,如README中所说,如果没有特殊需求,可以通过maven配置,如果要添加自定义词典,需要下载“依赖jar包和用户字典". 分享某大神的示例经验: 是直接"java xf hanlp-1.6.8-sources.ja...

左手的倒影
05/08
5
0
Hadoop代码运行的时候报错?

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration$DeprecationDelta at org.apache.hadoop.mapreduce.util.ConfigUtil.addDeprecatedKeys......

我爱编程zc
2016/09/17
2.1K
3
hdfs文件操作操作示例,包括上传文件到HDFS上、从HDFS上下载文件和删除HDFS上的文件

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.File; import java.io.IOException; /*hdfs文件操作操作示例,包括上传文件到HDFS上、从HDF......

八戒_o
2015/12/03
159
0

没有更多内容

加载失败,请刷新页面

加载更多

代理模式之JDK动态代理 — “JDK Dynamic Proxy“

动态代理的原理是什么? 所谓的动态代理,他是一个代理机制,代理机制可以看作是对调用目标的一个包装,这样我们对目标代码的调用不是直接发生的,而是通过代理完成,通过代理可以有效的让调...

code-ortaerc
今天
5
0
学习记录(day05-标签操作、属性绑定、语句控制、数据绑定、事件绑定、案例用户登录)

[TOC] 1.1.1标签操作v-text&v-html v-text:会把data中绑定的数据值原样输出。 v-html:会把data中值输出,且会自动解析html代码 <!--可以将指定的内容显示到标签体中--><标签 v-text=""></......

庭前云落
今天
8
0
VMware vSphere的两种RDM磁盘

在VMware vSphere vCenter中创建虚拟机时,可以添加一种叫RDM的磁盘。 RDM - Raw Device Mapping,原始设备映射,那么,RDM磁盘是不是就可以称作为“原始设备映射磁盘”呢?这也是一种可以热...

大别阿郎
今天
12
0
【AngularJS学习笔记】02 小杂烩及学习总结

本文转载于:专业的前端网站☞【AngularJS学习笔记】02 小杂烩及学习总结 表格示例 <div ng-app="myApp" ng-controller="customersCtrl"> <table> <tr ng-repeat="x in names | orderBy ......

前端老手
昨天
16
0
Linux 内核的五大创新

在科技行业,创新这个词几乎和革命一样到处泛滥,所以很难将那些夸张的东西与真正令人振奋的东西区分开来。Linux内核被称为创新,但它又被称为现代计算中最大的奇迹,一个微观世界中的庞然大...

阮鹏
昨天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部