文档章节

hadoop rpc客户端初始化和调用过程详解

彭苏云
 彭苏云
发布于 2015/01/28 00:33
字数 1694
阅读 152
收藏 1
点赞 0
评论 0

本文主要记录hadoop rpc的客户端部分的初始化和调用的过程,下面的介绍中主要通过DFSClient来说明,为什么用DFSClient呢?DFSClient作为namenode的客户端,通过rpc来操作hdfs。限于篇幅,本文对下文引用到的类,做了较大的剪裁,只给出了关键的部分,如有疑问,可以一起交流。

DFSClient的初始化

DFSClient的初始化主要看其构造函数,其中rpc部分我们主要关注属性final ClientProtocol namenode,DFSClient的文件系统操作都是由他代理完成,构造函数中的关键代码如下:

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {
	proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class);
 	this.dtService = proxyInfo.getDelegationTokenService();
 	this.namenode = proxyInfo.getProxy();
}

显然,DFSClient中的namenode是一个代理类。

接着NameNodeProxies类的createProxy方法,下面给出了NameNodeProxies中需要用到的一些方法:

public class NameNodeProxies {
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
      	URI nameNodeUri, Class<T> xface) throws IOException {
		return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
          		UserGroupInformation.getCurrentUser(), true);
}

public static <T> ProxyAndInfo<T> createNonHAProxy(
      	Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
      		UserGroupInformation ugi, boolean withRetries) throws IOException {
		proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
		return new ProxyAndInfo<T>(proxy, dtService);
}

/**
	这部分是重点
*/
private static ClientProtocol createNNProxyWithClientProtocol(
      	InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
      		boolean withRetries) throws IOException {
		ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
       		ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        			NetUtils.getDefaultSocketFactory(conf),
        				org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy).getProxy();

		proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
          		ClientNamenodeProtocolPB.class,
          			new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
              			ClientNamenodeProtocolPB.class, proxy),
          		methodNameToPolicyMap,
          		defaultPolicy);

		return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}

该类中前面两个方法做跳转用,直接看createNNProxyWithClientProtocol方法,这里两行很关键的代码,proxy实例的初始化,这里先提示注意前一行中的getProxy() 对于这个方法是需要注意的,这样也保证了类型的一致。

这时候就不得不调出RPC这个类来看看他是怎么生成proxy的实例的了,看代码:ProtobufRpcEngineProtobufRpcEngineProtobufRpcEngineProtobufRpcEngine

public class RPC {
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy) throws IOException {    
    	if (UserGroupInformation.isSecurityEnabled()) {
      	SaslRpcServer.init(conf);
    	}
   	return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
  }
}


RPC中还是需要进一步的跳转,但是这里需要注意,getProtocolEngine这个方法,这里做一个说明,查看
RpcEngine的依赖,看图:  在我的2.4.1的hadoop的版本中,hadoop的序列化框架已经用了Protobuf,所以getProtocolEngine方法得到的是ProtobufRpcEngine类的一个实例,那好,我们进一步跟踪ProtobufRpcEngine类的getProxy方法,看代码:
public class ProtobufRpcEngine implements RpcEngine {
	public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      	InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      	SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
      	) throws IOException {
    	final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        	rpcTimeout, connectionRetryPolicy);
    	return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        	protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
  	}
}

对java的动态代理有点了解的人看到Proxy.newProxyInstance这个方法应该都很清楚这就是生成一个远程代理类实例(特别注意:在NameNodeProxies类的createNNProxyWithClientProtocol方法中getProxy方法拿到的对象也就是这个对象),其中的invoker参数,确实我们不能忽略的,因为他暗藏玄机,java的动态代理中,invoker的类需要实现InvocationHandler接口,该接口只听过一个方法invoke,共代理类使用,及通过Proxy.newProxyInstance生成的代理类,在使用的时候是通过InvocationHandler的invoke方法来起作用的。好吧,现在我们可以顺便看看在ProtobufRpcEngine类的getProxy方法中invoker局部变量的类依赖图:,显然有刚才提到的实现关系,现在再让我们看看Invoker的内部,包括构造函数和invoke方法:

private Invoker(Class<?> protocol, Client.ConnectionId connId,
        Configuration conf, SocketFactory factory) {
      this.remoteId = connId;
      this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
      this.protocolName = RPC.getProtocolName(protocol);
      this.clientProtocolVersion = RPC
          .getProtocolVersion(protocol);
    }

public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
	   val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
}

在构造函数请注意一个属性client,他的类型正式 org.apache.hadoop.ipc.Client,而且在invoke方法中发起远程调用的正是这个client属性,能够读到这里的同学,相信应该比较清楚了,在DFSClient中发起远程访问的就是这个Client类的实例。

关于DFSClient的初始化阶段中关于rpc的部分,总结一句,就是创建一个namenode的代理对象,供后续的文件系统操作调用。

DFSClient的getFileLinkInfo方法

DFSClient提供了相当丰富的API供客户端操作hadoop的文件系统,这里以 getFileLinkInfo为例,讲解rpc客户端的调用过程。注意:如果是FileSystem类的话,请使用方法getFileLinkStatus,他对DFSClient提供的getFileLinkInfo做了一层包装,仅此而已。

直接看DFSClient中的代码:

public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
    checkOpen();
    try {
      return namenode.getFileLinkInfo(src);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     UnresolvedPathException.class);
     }
   }


很简答的一行代码,通过namenode属性的调用操作完成,看了DFSClient的初始化过程,我们很容易知道namenode的实例化类是ClientNamenodeProtocolTranslatorPB,继续看调用过程,代码转到了ClientNamenodeProtocolTranslatorPB中:

@Override
  public HdfsFileStatus getFileLinkInfo(String src)
      throws AccessControlException, UnresolvedLinkException, IOException {
    GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
        .setSrc(src).build();
    try {
      GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);
      return result.hasFs() ?  
          PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }


这时候我们会发现一个属性rpcProxy,再回过头看看NameNodeProxies类的createProxy方法,我们就可以很清楚的知道,rpcProxy就是那个能发起远程调用的代理类,它封装了Invoker对象,当然就也有了使用Client类的能力,很好,这里我们稍微总结下,在DFSClient类中,调用getFileLinkInfo方法,最终就是通过Client的call方法,发起远程访问,获取数据。

这时候,我们可以进一步来探讨下Hadoop中RPC的Client类了,下面我把Client类主要的部分抽取出来了,看下面的代码:

public class Client {
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
    		return new Call(rpcKind, rpcRequest);
    }

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      	ConnectionId remoteId, int serviceClass) throws IOException {
		final Call call = createCall(rpcKind, rpcRequest);

    		Connection connection = getConnection(remoteId, call, serviceClass);

		connection.sendRpcRequest(call);                 // send the rpc request

		return call.getRpcResponse();
}

private class Connection extends Thread {
		private void receiveRpcResponse() {
			
		}

		public void sendRpcRequest(final Call call)
        		throws InterruptedException, IOException {

		}
}
}

看了DFSclient的初始化部分,我们就可以知道,DFSClient的远程调用,是通过Client的call方法起作用的。其实Client的call方法已经很能够说明问题了,先封装一个call,然后获取连接,再得到结果。简单的说Client就是这样了。可以在稍微复杂一点,在Client的call方法中,封装了call后,getConnection的方法不仅是获取一个连接,同时会启动连接代表的线程,这个线程的作用就是等待请求的完成,完成后,将结果写到call中(该过程天内各国Connection的receiveRpcRespoce方法完成),在call方法中获取连接后,会发送请求的参数到namenode的服务端,等待namenode处理完毕,Connection的receiveRpcRespoce方法写返回结果,最后call方法中返回结果。大概的过程就是这个样子了。

好像整个过程也不太复杂,只是不熟悉的情况下跟踪代码会比较累点。


© 著作权归作者所有

共有 人打赏支持
彭苏云
粉丝 41
博文 202
码字总数 54255
作品 0
广州
高级程序员
Hadoop实战-中高级部分 之 Hadoop RPC

Hadoop RestFul Hadoop HDFS原理1 Hadoop HDFS原理2 Hadoop作业调优参数调整及原理 Hadoop HA Hadoop MapReduce高级编程 Hadoop IO Hadoop MapReduce工作原理 Hadoop 管理 Hadoop 集群安装 ...

2k10 ⋅ 2015/03/23 ⋅ 0

Hadoop中RPC机制详解之Server端

Hadoop 中 RPC 机制详解之 Client 端 1. Server.Listener RPC Client 端的 RPC 请求发送到 Server 端后, 首先由 Server.Listener 接收 Server.Listener 类继承自 Thread 类, 监听了 OPREAD 和......

wall--e ⋅ 2016/05/05 ⋅ 0

Hadoop中RPC机制简介

RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之...

wall--e ⋅ 2016/04/16 ⋅ 0

hadoop HDFS详解

一、HDFS的基本概念 1.1、数据块(block) HDFS(Hadoop Distributed File System)默认的最基本的存储单位是64M的数据块。 和普通文件系统相同的是,HDFS中的文件是被分成64M一块的数据块存储的...

xrzs ⋅ 2012/10/11 ⋅ 0

hadoop中RPC通信文件上传原理

//APP2中调用的代码public static final String HDFS_PATH = "hdfs://hadoop:9000/hello";public static final String DIR_PATH = "/d1000";public static final String FILE_PATH = "/d1000......

simpler ⋅ 2014/04/15 ⋅ 0

hdfs详解一

hdfs详解一 1、hdfs的三个进程: NameNode:接受客户端请求、管理hdfs、维护文件元信息和操作日志 DataNode:存储数据块和数据块校验和、通过水平复制使文件冗余度满足要求 Secondary NameNode...

pengzonglu7292 ⋅ 2017/12/19 ⋅ 0

原hadoop中RPC通信文件上传原理

01 //APP2中调用的代码 02 public static final String HDFSPATH = "hdfs://hadoop:9000/hello"; 03 public static final String DIRPATH = "/d1000"; 04 public static final String FILEP......

蓝狐乐队 ⋅ 2014/04/29 ⋅ 0

远程过程调用(RPC)详解

原文同步至 本文介绍了什么是远程过程调用(RPC),RPC 有哪些常用的方法,RPC 经历了哪些发展阶段,以及比较了各种 RPC 技术的优劣。 什么是 RPC RPC 是远程过程调用(Remote Procedure Call...

waylau ⋅ 2016/07/11 ⋅ 27

hadoop mapreduce过程分析学习

mapreduce框架学习 第一代mapreduce局限性 扩展性差: JobTracker同时具备了资源管理和作业控制两个功能,制约了hadoop集群扩展性 资源利用率低,mr1采用了基于槽位的资源分配模型,槽位slo...

writeademo ⋅ 2016/11/26 ⋅ 0

glusterfs通信之rpc

在glusterfs中,gluster与glusterd通信请求对卷的操作、集群的操作、状态的查看等;glusterd与glusterfsd通信完成对卷的操作,集群的操作,状态的查看;glusterfs与glusterfsd通信完成文件的...

hncscwc ⋅ 2014/03/28 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Android JNI 读写Bitmap的方法

Java层创建Bitmap,通过JNI将Bitmap传到C/C++进行处理 Java部分 public static native boolean greenBitmap(Bitmap bitmap); C/C++部分 JNIEXPORT jboolean JNICALL Java_com_test_Test_gree......

国仔饼 ⋅ 14分钟前 ⋅ 0

一次性让你懂async/await,解决回调地狱

什么是async? 欢迎留言讨论 async 函数是 Generator 函数的语法糖。使用 关键字 async 来表示,在函数内部使用 await 来表示异步。相较于 Generator,async 函数的改进在于下面四点: 内置执...

阿K1225 ⋅ 14分钟前 ⋅ 0

angular常用命令

.下载更新操作 1.利用npm下载angular的命令行工具AngularCLI: npm install -g @angular/cli 2.下载jquery: npm install --save jquery 3.更新npm: npm i -g npm 4.更新angular: ng update ......

消散了的诗意 ⋅ 16分钟前 ⋅ 0

window.print 页面打印

定义和用法 print() 方法用于打印当前窗口的内容。 语法 window.print(); window.print() 实际上,是浏览器打印功能菜单的一种程序调用。与点击打印功能菜单一样,不能精确分页,不能设置纸型...

初学者的优化 ⋅ 17分钟前 ⋅ 0

魔兽世界 7.0版本上 PVE装备全攻略

  T套 因为大家应该都会打穿副本的所以具体是哪个boss我就不说了。   T1: 所有套装都在【熔火之心】出   T2: 头原来是在【奥妮克希亚的巢穴】改到黑翼之巢的奈法利安了,腿是在【熔火之...

wangchen1999 ⋅ 17分钟前 ⋅ 0

java.math.BigDecimal使用小结

原文地址 java.math.BigDecimal使用小结 divide方法 使用BigDecimal.divide方法时一定要考虑: 除数是否为0 商是否是无限小数 正确的使用方式 判断除数是否为0,是0做另外的处理逻辑 调用除法...

666B ⋅ 20分钟前 ⋅ 0

关于qstring转char乱码问题。

if (OpenClipboard(NULL)) { HGLOBAL hgClip; EmptyClipboard(); QByteArray byay = FValue.toLocal8Bit(); //转latin编码 char *bochsrc_line = byay.data(); hgClip = GlobalAlloc(GMEM_DD......

backtrackx ⋅ 20分钟前 ⋅ 0

了解SSH加密和连接过程

介绍 SSH或安全shell是安全协议,也是安全管理远程服务器的最常用方式。通过使用多种加密技术,SSH提供了一种机制,用于在双方之间建立加密安全连接,对彼此进行身份验证,以及来回传递命令和...

吴伟祥 ⋅ 27分钟前 ⋅ 0

微信小程序

小程序的全局配置app.json 微信小程序的全局配置保存在app.json文件中。开发者通过使用app.json来配置页面文件(pages)的路径、窗口(window)表现、设定网络超时时间值(networkTimeout)以...

上官清偌 ⋅ 30分钟前 ⋅ 0

【转】百度坐标坐标系之间的转换(JS版代码)

/** * Created by Wandergis on 2015/7/8. * 提供了百度坐标(BD09)、国测局坐标(火星坐标,GCJ02)、和WGS84坐标系之间的转换 *///定义一些常量var x_PI = 3.1415926535897932...

HAVENT ⋅ 32分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部