文档章节

hadoop 远程调度(三)

政委007
 政委007
发布于 2017/05/04 11:40
字数 1022
阅读 33
收藏 2

简介

前几篇博客介绍了NIO和hadoop rpc的主要流程。本文主要介绍hadoop rpc server端接受到请求怎么处理,怎样返回。

rpc 服务端提供服务的大致流程

  1. rpc server端在接受到客户端请求后,会解析接受到的参数,获取需要执行的类,接口,方法,参数等信息
  2. 根据接口获取启动server时注入的实现类的实例
  3. 拿到实例后通过反射执行实例的对应方法。然后返回执行的结果到客户端

查看本文前需要先了解NIO的相关知识,这部分用到了大量的NIO知识。

rpc server 服务

上一篇博客说道server.start()方法。启动了两个线程,分别是:listener 和 responder

  • listener 监听rpc端口,处理rpc事件
  • responder 处理执行结果的返回

下边先看看listener 调用start 后具体做了什么

Server.Listener的构造方法

作用: 构建一个listener。 内部主要打来一个ServerSocketChannel通道,同时创建读线程,注册accept事件,并且设置本线程为守护线程。

 public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
     //创建一个服务通道。并且设置成非阻塞状态。
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
        //根据参数创建一个用来读取请求数据的多个线程
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }
      // 注册accept事件到通道中。
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

Server.Listener.run()方法

    @Override
    public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      connectionManager.startIdleScan();
      while (running) {
        SelectionKey key = null;
        try {
           //获取构造函数中的selector ,并且开始等待事件,处于阻塞状态,当有事件到达时才会向下处理。
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                //处理accept事件。
                  doAccept(key);
              }
        //省略代码
    }

Server.Listener.doAccept()

这个方法中主要构建了一个channel和一个connection实例,同时把connection实例添加到正在运行的reader线程,让reader线程处理请求。

   void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        //设置channel为非阻塞,立刻返回为null
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        channel.socket().setKeepAlive(true);
        //获取一个reader线程。初始化在Listener的构造函数中。
        Reader reader = getReader();
        //register内部主要是创建一个connection对象。同时把这个对象添加到集合中,返回conn
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        //把conn添加到reader线程padding队列中,reader线程会自动处理后续事件
        reader.addConnection(c);
      }
    }

到这里listener的accept请求事件结束,后面会接受read事件。下面我们看看Server.Listener.Reader线程中到底怎么处理这些read事件的。

Server.Listener.Reader

    //保存等待处理的链接
      final private BlockingQueue<Connection> pendingConnections;
      private final Selector readSelector; //本线程的一个通道管理器
    //简单的构造函数
      Reader(String name) throws IOException {
        super(name);
        this.pendingConnections =
            new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
        this.readSelector = Selector.open();
      }
      @Override
      public void run() {
           //主要调度这个函数   
          doRunLoop();
      }
      private synchronized void doRunLoop() {
        while (running) {
          SelectionKey key = null;
          try {
            //获取当前正在等待处理的链接,并且把read事件注册到当前线程的readerSelecter事件中
            int size = pendingConnections.size();
            for (int i=size; i>0; i--) {
              Connection conn = pendingConnections.take();
              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
            }
            readSelector.select(); //开始监听事件
            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              if (key.isValid()) {
                if (key.isReadable()) {
                  doRead(key); //通过doRead方法处理读事件。
                }
              }
            }
        }
      }

Server.Listener.doRead()方法主要作用

  1. 根据key 获取 对应的connection
  2. 调用connection的 readAndProcess()方法

Server.Connection.readAndProcess()方法

  1. 获取请求header ,判断请求类型
  2. 从channel中读取请求数据到缓冲区
  3. 调用processOneRpc方法处理数据

Server.Contection.processOneRpc()

  1. 从缓冲区中获取数据构建数据流
  2. 获取请求的header
  3. 调用processRpcRequest(header,dis)方法

Server.Contection.processRpcRequest()

  1. 获取RPC请求类型,获取对应的处理类
  2. 构造一个Call类来处理请求
  3. 把Call放入callQueue中,方便后续handle线程处理请求。

上边这部分内容各种判读,我也没看太懂,暂时忽略吧,各种判断有点头疼

© 著作权归作者所有

政委007
粉丝 10
博文 15
码字总数 15843
作品 0
洛阳
程序员
私信 提问
大数据Hadoop需要了解哪些内容?

一、Hadoop环境搭建 1. Hadoop生态环境介绍 2. Hadoop云计算中的位置和关系 3. 国内外Hadoop应用案例介绍 4. Hadoop概念、版本、历史 5. Hadoop核心组成介绍及hdfs、mapreduce体系结构 6. H...

mo默瑶
2018/05/05
0
0
Hadoop MapReduce优化和资源调度器

Hadoop Shuffle过程 1.Hadoop MapReduce Shuffle过程 Hadoop Shuffle过程 Map Shuffle过程图2 2.Shuffle过程要点记录 每个Map Task把输出结果写到内存中的环形缓冲区。 当内存环形缓冲区写入...

溯水心生
2018/01/14
0
0
学习笔记TF065: TensorFlowOnSpark

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direc...

利炳根
2017/11/13
0
0
Eclipse连接Hadoop分析的三种方式

Hadoop一般都部署在linux平台上,想让Hadoop执行我们写好的程序,首先需要在本地写好程序打包,然后上传到liunx,最后通过指定命令执行打包好的程序;一次两次还可以,如果进行频繁的调试是很...

ksfzhaohui
2016/10/27
2.4K
0
好程序员大数据划重点 hadoop常用四大模块文件

1.core-site.xml(工具模块)。包括Hadoop常用的工具类,由原来的Hadoopcore部分更名而来。主要包括系统配置工具Configuration、远程过程调用RPC、序列化机制和Hadoop抽象文件系统FileSystem等...

好程序员IT
05/16
1
0

没有更多内容

加载失败,请刷新页面

加载更多

maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
今天
8
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
今天
6
0
jar 更新部分文件

C:\Program Files (x86)\Java\jdk1.8.0_102\bin>jar -hIllegal option: hUsage: jar {ctxui}[vfmn0PMe] [jar-file] [manifest-file] [entry-point] [-C dir] files ...Options: -c c......

圣洁之子
今天
9
0
OSChina 周六乱弹 —— 感谢女装红薯开办了这个网站

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @胖达panda:分享歌词: 我有一只小毛驴我从来也不骑,有一天我心血来潮骑着去赶集,我手里拿着小皮鞭我心里正得意,不知怎么哗啦啦,我摔了一...

小小编辑
今天
2.6K
13
DDD(四)

1,引言 软件开发者大多趋向于将关注点放在数据上,而不是领域上。这对于刚入门的DDD的新手而言也是如此。以我目前的思考方式,数据库依然占据主要的地位。开发一个功能,首先我就会考虑我会...

MrYuZixian
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部