文档章节

hadoop rpc服务端初始化和调用过程详解

彭苏云
 彭苏云
发布于 2015/01/28 13:26
字数 1731
阅读 160
收藏 0
点赞 0
评论 0

本文主要描述了hadoop rpc服务端的初始化和调用过程,相比客户端的初始化,rpc服务端感觉会简单点,但是调用过程却比客户端复杂一些。本文还是以namenode为例,namenode会在执行main方法的时候,创建一个namenode实例,及完成一系列的初始化过程,其中就包括了rpc的初始化过程。

rpc服务端的初始化

上面已经提到我们这里主要借用了namenode的远程服务,先来看看相关代码:

public class NameNode implements NameNodeStatusMXBean {
public static void main(String argv[]) throws Exception {
		NameNode namenode = createNameNode(argv, null);
}	

protected NameNode(Configuration conf, NamenodeRole role)throws IOException { 
		initialize(conf);
}

protected void initialize(Configuration conf) throws IOException {
		rpcServer = createRpcServer(conf);

		startCommonServices(conf); //相当重要
}

protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException {
		return new NameNodeRpcServer(conf, this);
    }
}

我们的linux的终端执行hadoop的启动命令的时候,最终的命令是调用NameNode的main方法,所以我们追踪代码的切入点是NameNode的main方法,方法比较简单,就是调用NameNode的构造函数创建一个NameNode,然后执行初始化方法initialize,这个方法相对来说,是我们关注的重点,包括rpc服务在内的初始化操作都放在这个方法里面。特定于rpc,他执行了两个相关的方法createRpcServer和startCommonServices,第一个方法见名思意,不多说,先简单介绍下后面的方法,该方法的作用就是启动namenode的rpc服务,稍后我给出代码。好的,从上面的代码可以看到,我们的rpcServer功能都放在了类NameNodeRpcServer里面,现在让我们来看看这个类里面相关的代码:

class NameNodeRpcServer implements NamenodeProtocols {
public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {    
    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
        ProtobufRpcEngine.class);

    ClientNamenodeProtocolServerSideTranslatorPB 
       clientProtocolServerTranslator = 
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
     BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); // fs.defaultFS
    String bindHost = nn.getRpcServerBindHost(conf);
    if (bindHost == null) {
      bindHost = rpcAddr.getHostName();
    }
    LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());

    this.clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService).setBindAddress(bindHost)
        .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();

    // Add all the RPC protocols that the namenode implements
    DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
 }
}

在NameNodeRpcServer的构造函数里面最重要的一件事情是实例化clientRpcServer,这里面我最想说明的是,NameNode宣称自己实现了三个协议:ClientProtocol、DatanodeProtocol和NamenodeProtocol,在服务端的实现基本上就靠ClientNamenodeProtocolServerSideTranslatorPB之类的类型了,特别在实例化ClientNamenodeProtocolServerSideTranslatorPB的时候有传入一个形参,这个形参就是NameNodeRpcServer实例,看代码:

public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server)
      throws IOException {
    this.server = server;
  }

  @Override
  public GetBlockLocationsResponseProto getBlockLocations(
      RpcController controller, GetBlockLocationsRequestProto req)
      throws ServiceException {
    try {
      LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
          req.getLength());
      Builder builder = GetBlockLocationsResponseProto
          .newBuilder();
      if (b != null) {
        builder.setLocations(PBHelper.convert(b)).build();
      }
      return builder.build();
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

上面代码中的getBlockLocations也一定程度上说明了刚才的观点。

现在让我们回过头看看NameNode中initialize方法中执行的startCommonServices方法,这个方法用来启动clientRpcServer下面的线程,包括listener,handler、response,具体看代码: 

public class NameNode implements NameNodeStatusMXBean {
private void startCommonServices(Configuration conf) throws IOException {
	rpcServer.start();
}
}

class NameNodeRpcServer implements NamenodeProtocols {
 void start() {
    clientRpcServer.start();
    if (serviceRpcServer != null) {
      serviceRpcServer.start();      
    }
  }
}

public abstract class Server {
  public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }
}

代码看到这里,启动过程中rpc相关的代码就结束了。

rpc服务端的调用过程

现在让我们来看看rpc被调用的过程,先来认识下Server的关键结构:

public abstract class Server {
  private Listener listener = null;
  private Responder responder = null;
  private Handler[] handlers = null;

  private class Responder extends Thread {

  }

  private class Listener extends Thread {

  }

  private class Handler extends Thread {

  }
}

在初始化的时候,就启动listener、responder和handlers下面的所有线程。

其中listener线程里面启动了一个socker服务,专门用来接受客户端的请求,handler下面的线程用来处理具体的请求,responder写请求结果,具体过程可以看下下面的代码:

public abstract class Server {
  private Listener listener = null;
  private Responder responder = null;
  private Handler[] handlers = null;

  private class Listener extends Thread {
public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

public void run() {
		while (running) {
			doAccept(key);
		}
}

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
        Reader reader = getReader();
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
    }

private class Reader extends Thread {
	public void run() {
		doRunLoop();
	}

	private synchronized void doRunLoop() {
		while (running) {
			Connection conn = pendingConnections.take();
              	conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
		}
		readSelector.select();
		doRead(key);
	}

	void doRead(SelectionKey key) throws InterruptedException {
		Connection c = (Connection)key.attachment();
		count = c.readAndProcess();
	}
}
  }

  public class Connection {
public int readAndProcess(){
	processOneRpc(data.array());
}

private void processOneRpc(byte[] buf){
	processRpcRequest(header, dis);
}

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
	 Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
              .getClientId().toByteArray());
      callQueue.put(call);
}
  }

  private class Handler extends Thread {
public void run() {
	final Call call = callQueue.take();
	value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                           call.timestamp);

	setupResponse(buf, call, returnStatus, detailedErr, 
                value, errorClass, error);

	responder.doRespond(call);
}
  }

  private class Responder extends Thread {
void doRespond(Call call) throws IOException {
	processResponse(call.connection.responseQueue, true);
}

private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
	int numBytes = channelWrite(channel, call.rpcResponse);

	done = true;
}
  }
}

这里给出了一个比较完整版Server的rpc调用过程,从listener都构造函数开始,在他的构造函数中起了几个reader线程,当监听器收到访问请求的时候,由reader请请求中读取数据,reader中实际上调用的是connection的readAndProcess方法,在这个方法中,会往RPC server中的callQueue添加call对象,之后,handler这个家伙从队列中取出当前call,具体的处理过程,用到了Server类的call方法,这地方有些玄机,仔细跟过代码的人才知道,因为server的实例类不再是org.apache.hadoop.ipc.Server,而是Protobuf的一个实现类,org.apache.hadoop.ipc.RPC.Server,而且call方法是被重写过的,代码如下:

@Override
    public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
    }

继续追踪下,差不多就可以到底了:

public class ProtobufRpcEngine implements RpcEngine {
public static class Server extends RPC.Server {
	static class ProtoBufRpcInvoker implements RpcInvoker {
		public Writable call(RPC.Server server, String protocol,
          		Writable writableRequest, long receiveTime) throws Exception {
        	ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,clientVersion);
        	BlockingService service = (BlockingService) protocolImpl.protocolImpl;

          	result = service.callBlockingMethod(methodDescriptor, null, param);
          
        	return new RpcResponseWrapper(result);
	}
}
}

这部分的代码也正是hadoop rpc与protobuf结合的地方,这地方在补充一点,protbufImpl就是NameNodeRpcServer初始化的时候,已经准备了,而且看懂ProtoBufRpcInvoker下的call方法,确实也是需要结合NameNodeRpcServer初始化过程来理解的。我朦朦胧胧的懂了。而且这地方的深入会让你看到一些本质的东西,举例的话,你会跟踪到ClientNamenodeProtocolServerSideTranslatorPB,然后是NameNodeRpcServer,再然后是FSNamesystem,最后你发现,服务端对文件系统的操作出自FSNamesystem。

继续回到handler中的run方法,call方法调用完了,就轮到Responder处理返回结果了。

整个过程就是这样了,需要说明点,上面写都东西有些可以确认没问题了,有些是个人结合书的一些总结,不一定对,仅供参考。

© 著作权归作者所有

共有 人打赏支持
彭苏云
粉丝 41
博文 204
码字总数 54255
作品 0
广州
高级程序员
Hadoop中RPC机制详解之Server端

Hadoop 中 RPC 机制详解之 Client 端 1. Server.Listener RPC Client 端的 RPC 请求发送到 Server 端后, 首先由 Server.Listener 接收 Server.Listener 类继承自 Thread 类, 监听了 OPREAD 和......

wall--e
2016/05/05
113
0
Hadoop实战-中高级部分 之 Hadoop RPC

Hadoop RestFul Hadoop HDFS原理1 Hadoop HDFS原理2 Hadoop作业调优参数调整及原理 Hadoop HA Hadoop MapReduce高级编程 Hadoop IO Hadoop MapReduce工作原理 Hadoop 管理 Hadoop 集群安装 ...

2k10
2015/03/23
0
0
Hadoop中RPC机制简介

RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之...

wall--e
2016/04/16
385
0
Hadoop中RPC机制详解之Client端

先看看这个吧, Hadoop 中 RPC 机制简介, Hadoop 中 RPC 机制的实现都在 org.apache.hadoop.ipc 这个包里, 下面都将围绕这个包解读 Hadoop RPC 机制 1. RPC.getServer(Object instance, Stri...

wall--e
2016/05/05
236
0
hadoop中RPC通信文件上传原理

//APP2中调用的代码public static final String HDFS_PATH = "hdfs://hadoop:9000/hello";public static final String DIR_PATH = "/d1000";public static final String FILE_PATH = "/d1000......

simpler
2014/04/15
0
0
Hadoop 2.0 Yarn代码:心跳驱动服务分析

当RM(ResourcesManager)和NM(NodeManager)陆续将所有模块服务启动,最后启动是NodeStatusUpdater,NodeStatusUpdater将用Hadoop RPC远程调用ResourcesTrackerService中的函数,进行资源是初始...

超人学院
2015/05/28
0
0
simpleRpc解析-服务端

本文主要是对勇哥的simpleRpc进行了简单的剖析,用来学习rpc,加深对rpc的理解! 源码地址:http://git.oschina.net/huangyong/rpc 勇哥博客:https://my.oschina.net/huangyong/blog/36175...

涩谷直子
06/22
0
0
原hadoop中RPC通信文件上传原理

01 //APP2中调用的代码 02 public static final String HDFSPATH = "hdfs://hadoop:9000/hello"; 03 public static final String DIRPATH = "/d1000"; 04 public static final String FILEP......

蓝狐乐队
2014/04/29
0
0
RPC的简单实现

RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之...

wall--e
2016/05/14
309
3
RPC原理及RPC实例分析

在学校期间大家都写过不少程序,比如写个hello world服务类,然后本地调用下,如下所示。这些程序的特点是服务消费方和服务提供方是本地调用关系。 而一旦踏入公司尤其是大型互联网公司就会发...

Hosee
2016/07/14
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

命令

sudo nginx -s reload 重启nginx sudo lsof -i -P | grep -i "listen" 查看端口占用

那个猴子
2分钟前
0
0
用scrapy-redis爬去新浪-以及把数据存储到

需求:爬取新浪网导航页(http://news.sina.com.cn/guide/)所有下所有大类、小类、小类里的子链接,以及子链接页面的新闻内容。 准备工作: a.安装redis(windows或者linux) b.安装Redis Des...

丁典
2分钟前
0
0
PHP常用函数篇

1.为什么要使用函数? 除了内建的PHP函数,我们可以创建我们自己的函数。 函数是可以在程序中重复使用的语句块。 使代码逻辑更清晰 避免过多的全局变量 封装后避免相同逻辑重复代码,只需调用...

天地有涯风有信_大海无量不见人
3分钟前
0
0
对List分组

在日常工作中会遇到这样的情景,我们需要对List按照List中对象的一个值进行分组。比如一个Human的List,我们要根据性别分组,传统的方法是做双层循环,逐个对比,今天我要介绍一种详单简单的...

珂jack
5分钟前
0
0
分析jquery ajax jsonpCallback回调函数名包含点号报错问题

现象 项目中涉及到跨域请求,采用jquery ajax jsonp来实现,但是遇到一个奇怪问题,在设置回调函数名称时,若包含点号,如“Callback.Success”,那么执行完成后,其error回调函数始终会被触...

iwaller
8分钟前
0
0
【Graphql实践】使用 Apollo(iOS) 访问 Github 的 Graphql API

最近在协助调研 Apollo 生成的代码是否有可能跨 Query 共享模型的问题,虽然初步结论是不能,并不是预期的结果,但是在调研过程中积累的一些经验,有必要记录下。如果你也对 Graphql 感兴趣,...

ios122
8分钟前
1
0
聊聊spring cloud的AsyncLoadBalancerAutoConfiguration

序 本文主要研究一下AsyncLoadBalancerAutoConfiguration AsyncLoadBalancerAutoConfiguration spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadba......

go4it
17分钟前
0
0
10.19 iptables规则备份和恢复 ,firewalld的9个zone,service的操作

保存和备份iptables规则 内容: 保存iptables规则 service iptables save 把iptables规则备份到my.ipt文件中: iptables-save > my.ipt 恢复刚才备份的规则: iptables-restore < my.ipt 1.......

Linux_老吴
20分钟前
0
0
Vue 自动化表单相关资料

1.使用vue自动化表单 2.Vue可视化,Vue代码生成,Vue动态表单 3.前端表单进阶之路:通过 Vue.js 实现表单可配置化 4.使用Vue动态生成form表单 5.autoform-devtool 6.Vue.js实践:实现多条件筛...

IT追寻者
21分钟前
0
0
动态SQL

一、动态SQL 1、if <select id="findActiveBlogWithTitleLike" resultType="Blog"> SELECT * FROM BLOG WHERE state = ‘ACTIVE’ <if test="title != null"> AND title l......

一个yuanbeth
24分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部