文档章节

hadoop 远程调度(四)

政委007
 政委007
发布于 2017/05/05 11:01
字数 1003
阅读 33
收藏 2

简介

继续上一篇博客远程调度(三)来介绍hadoop远程调度过程。上一篇主要写rpc 服务端启动,和接受到客户端请求,处理请求,及把请求封装成一个call对象。接下来介绍怎么执行客户端请求,和结果返回客户端。

Hander 中处理Call对象

hander对象数组初始化在Server.start()方法中,这个方法中启动了一系列hander线程,这些线程在run方法中从Server.callQueue队列中获取call对象,并且处理请求,获取返回结果,并且把结果返回个对应的客户端。

   @Override
    public void run() {
      while (running) {
        TraceScope traceScope = null;
        try {
          final Call call = callQueue.take(); // 排队获取需要处理的call
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject 调用call方法,处理请求,底层调用  
            //Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters());方法,返回处理结果。
            if (call.connection.user == null) {
              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                           call.timestamp);
            } else {
              value = 
                call.connection.user.doAs
                  (new PrivilegedExceptionAction<Writable>() {
                     @Override
                     public Writable run() throws Exception {
                       // make the call
                       return call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);

                     }
                   }
                  );
            }
          } catch (Throwable e) {
          
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            setupResponse(buf, call, returnStatus, detailedErr,
                value, errorClass, error);
            call.sendResponse();  //处理结果
          }
        } catch (InterruptedException e) {
        } finally {
          IOUtils.cleanup(LOG, traceScope);
        }
      }
      LOG.debug(Thread.currentThread().getName() + ": exiting");
    }

  }

#setupResponse() 方法

  1. 整理返回结果header
  2. 根据不同请求方式序列化返回结果数据到缓冲区中

Server.Call.sendResponse()方法

  1. 调用connection的sendResponse()方法
  2. connection.sendResponse()调用了responder.doRespond(call)方法,这时候请求被推送到responder线程中,这个线程在server.start()时被启动。用来等待处理返回结果

Server.Responder.doRespond() 方法

    void doRespond(Call call) throws IOException {  
    //给responseQueue添加同步锁,确保同时只有一个响应被处理
      synchronized (call.connection.responseQueue) {  
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1) {
        //调用processResponse方法处理返回结果
          processResponse(call.connection.responseQueue, true);
        }
      }
    }

Server.Responder.processResponse()

作用: 处理一个响应,如果channel没有数据需要处理则返回true

 private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
      boolean error = true;
      boolean done = false;       // there is more data for this channel.
      int numElements = 0;
      Call call = null;
      try {
        synchronized (responseQueue) {
          numElements = responseQueue.size();
          if (numElements == 0) {
            error = false;
            return true;              // no more data for this channel.
          }
          //提取需要处理的call
          call = responseQueue.removeFirst();
          SocketChannel channel = call.connection.channel;
          // Send as much data as we can in the non-blocking fashion
           ////尽可能多的写数据到channel中 这个是最后想客户端发送数据的的地方,是不是很奇怪,因为在Responder线程中run方法,最终会循环调用这个方法,持续写数据到通道中,如果说没有更多数据,那么到这里其实整个请求已经完成,但是为了处理不能一次把数据都发送过去的情况,才有后续的处理。
          int numBytes = channelWrite(channel, call.rpcResponse);
          if (numBytes < 0) {
            return true;
          }
           //如果没有更多数据需要处理,清理rpcResponse
          if (!call.rpcResponse.hasRemaining()) {
            call.rpcResponse = null;
            call.connection.decRpcCount();
            if (numElements == 1) {    // last call fully processes.
              done = true;             // no more data for this channel.
            } else {
              done = false;            // more calls pending to be sent.
            }
          } else {
            //如果还有数据需要处理,把call重新加入responseQueue中
            call.connection.responseQueue.addFirst(call);
            
            if (inHandler) {
              // set the serve time when the response has to be sent later
              call.timestamp = Time.now();
              incPending();
              try {
                // Wakeup the thread blocked on select, only then can the call 
                // to channel.register() complete.
                writeSelector.wakeup();
                channel.register(writeSelector, SelectionKey.OP_WRITE, call); //注册写时间到writeSelector中,在run方法中会后续处理
              } catch (ClosedChannelException e) {
                //Its ok. channel might be closed else where.
                done = true;
              } finally {
                decPending();
              }
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                  + " Wrote partial " + numBytes + " bytes.");
            }
          }
          error = false;              // everything went off well
        }
      } finally {
        if (error && call != null) {
          LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
          done = true;               // error. no more data for this channel.
          closeConnection(call.connection);
        }
      }
      return done;
    }

Server.Responder.run()方法

 @Override
    public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
        doRunLoop(); //调用方法
    }
 
    private void doRunLoop() {
      long lastPurgeTime = 0;   // last check for old calls.
      while (running) {
        try {
          waitPending();     // If a channel is being registered, wait.
          writeSelector.select(PURGE_INTERVAL);
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            try {
              if (key.isValid() && key.isWritable()) {
                  doAsyncWrite(key); //底层调用Server.Responder.processResponse()方法。对数据进行输出。
              }
            } 
          }
          long now = Time.now();
          if (now < lastPurgeTime + PURGE_INTERVAL) {
            continue;
          }
          lastPurgeTime = now;
          //
          // If there were some calls that have not been sent out for a
          // long time, discard them.
          //
          if(LOG.isDebugEnabled()) {
            LOG.debug("Checking for old call responses.");
          }
          ArrayList<Call> calls;
          
          // get the list of channels from list of keys.
          synchronized (writeSelector.keys()) {
            calls = new ArrayList<Call>(writeSelector.keys().size());
            iter = writeSelector.keys().iterator();
            while (iter.hasNext()) {
              SelectionKey key = iter.next();
              Call call = (Call)key.attachment();
              if (call != null && key.channel() == call.connection.channel) { 
                calls.add(call);
              }
            }
          }
          
          for(Call call : calls) {
            doPurge(call, now);
          }
        } catch (OutOfMemoryError e) {
        }
      }

对结果返回的后续操作参考了 参考链接博客

© 著作权归作者所有

政委007
粉丝 10
博文 15
码字总数 15843
作品 0
洛阳
程序员
私信 提问
大数据Hadoop需要了解哪些内容?

一、Hadoop环境搭建 1. Hadoop生态环境介绍 2. Hadoop云计算中的位置和关系 3. 国内外Hadoop应用案例介绍 4. Hadoop概念、版本、历史 5. Hadoop核心组成介绍及hdfs、mapreduce体系结构 6. H...

mo默瑶
2018/05/05
0
0
Hadoop MapReduce优化和资源调度器

Hadoop Shuffle过程 1.Hadoop MapReduce Shuffle过程 Hadoop Shuffle过程 Map Shuffle过程图2 2.Shuffle过程要点记录 每个Map Task把输出结果写到内存中的环形缓冲区。 当内存环形缓冲区写入...

溯水心生
2018/01/14
0
0
学习笔记TF065: TensorFlowOnSpark

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direc...

利炳根
2017/11/13
0
0
好程序员大数据划重点 hadoop常用四大模块文件

1.core-site.xml(工具模块)。包括Hadoop常用的工具类,由原来的Hadoopcore部分更名而来。主要包括系统配置工具Configuration、远程过程调用RPC、序列化机制和Hadoop抽象文件系统FileSystem等...

好程序员IT
05/16
1
0
Hadoop 2.0 Yarn代码:心跳驱动服务分析

当RM(ResourcesManager)和NM(NodeManager)陆续将所有模块服务启动,最后启动是NodeStatusUpdater,NodeStatusUpdater将用Hadoop RPC远程调用ResourcesTrackerService中的函数,进行资源是初始...

超人学院
2015/05/28
76
0

没有更多内容

加载失败,请刷新页面

加载更多

Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

作者|白松 目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次...

数澜科技
今天
4
0
Xss过滤器(Java)

问题 最近旧的系统,遇到Xss安全问题。这个系统采用用的是spring mvc的maven工程。 解决 maven依赖配置 <properties><easapi.version>2.2.0.0</easapi.version></properties><dependenci......

亚林瓜子
今天
10
0
Navicat 快捷键

操作 结果 ctrl+q 打开查询窗口 ctrl+/ 注释sql语句 ctrl+shift +/ 解除注释 ctrl+r 运行查询窗口的sql语句 ctrl+shift+r 只运行选中的sql语句 F6 打开一个mysql命令行窗口 ctrl+l 删除一行 ...

低至一折起
今天
9
0
Set 和 Map

Set 1:基本概念 类数组对象, 内部元素唯一 let set = new Set([1, 2, 3, 2, 1]); console.log(set); // Set(3){ 1, 2, 3 } [...set]; // [1, 2, 3] 接收数组或迭代器对象 ...

凌兮洛
今天
4
0
PyTorch入门笔记一

张量 引入pytorch,生成一个随机的5x3张量 >>> from __future__ import print_function>>> import torch>>> x = torch.rand(5, 3)>>> print(x)tensor([[0.5555, 0.7301, 0.5655],......

仪山湖
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部