hadoop 远程调度(四)

原创
2017/05/05 11:01
阅读数 74

简介

继续上一篇博客远程调度(三)来介绍hadoop远程调度过程。上一篇主要写rpc 服务端启动,和接受到客户端请求,处理请求,及把请求封装成一个call对象。接下来介绍怎么执行客户端请求,和结果返回客户端。

Hander 中处理Call对象

hander对象数组初始化在Server.start()方法中,这个方法中启动了一系列hander线程,这些线程在run方法中从Server.callQueue队列中获取call对象,并且处理请求,获取返回结果,并且把结果返回个对应的客户端。

   @Override
    public void run() {
      while (running) {
        TraceScope traceScope = null;
        try {
          final Call call = callQueue.take(); // 排队获取需要处理的call
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject 调用call方法,处理请求,底层调用  
            //Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters());方法,返回处理结果。
            if (call.connection.user == null) {
              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                           call.timestamp);
            } else {
              value = 
                call.connection.user.doAs
                  (new PrivilegedExceptionAction<Writable>() {
                     @Override
                     public Writable run() throws Exception {
                       // make the call
                       return call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);

                     }
                   }
                  );
            }
          } catch (Throwable e) {
          
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            setupResponse(buf, call, returnStatus, detailedErr,
                value, errorClass, error);
            call.sendResponse();  //处理结果
          }
        } catch (InterruptedException e) {
        } finally {
          IOUtils.cleanup(LOG, traceScope);
        }
      }
      LOG.debug(Thread.currentThread().getName() + ": exiting");
    }

  }

#setupResponse() 方法

  1. 整理返回结果header
  2. 根据不同请求方式序列化返回结果数据到缓冲区中

Server.Call.sendResponse()方法

  1. 调用connection的sendResponse()方法
  2. connection.sendResponse()调用了responder.doRespond(call)方法,这时候请求被推送到responder线程中,这个线程在server.start()时被启动。用来等待处理返回结果

Server.Responder.doRespond() 方法

    void doRespond(Call call) throws IOException {  
    //给responseQueue添加同步锁,确保同时只有一个响应被处理
      synchronized (call.connection.responseQueue) {  
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1) {
        //调用processResponse方法处理返回结果
          processResponse(call.connection.responseQueue, true);
        }
      }
    }

Server.Responder.processResponse()

作用: 处理一个响应,如果channel没有数据需要处理则返回true

 private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
      boolean error = true;
      boolean done = false;       // there is more data for this channel.
      int numElements = 0;
      Call call = null;
      try {
        synchronized (responseQueue) {
          numElements = responseQueue.size();
          if (numElements == 0) {
            error = false;
            return true;              // no more data for this channel.
          }
          //提取需要处理的call
          call = responseQueue.removeFirst();
          SocketChannel channel = call.connection.channel;
          // Send as much data as we can in the non-blocking fashion
           ////尽可能多的写数据到channel中 这个是最后想客户端发送数据的的地方,是不是很奇怪,因为在Responder线程中run方法,最终会循环调用这个方法,持续写数据到通道中,如果说没有更多数据,那么到这里其实整个请求已经完成,但是为了处理不能一次把数据都发送过去的情况,才有后续的处理。
          int numBytes = channelWrite(channel, call.rpcResponse);
          if (numBytes < 0) {
            return true;
          }
           //如果没有更多数据需要处理,清理rpcResponse
          if (!call.rpcResponse.hasRemaining()) {
            call.rpcResponse = null;
            call.connection.decRpcCount();
            if (numElements == 1) {    // last call fully processes.
              done = true;             // no more data for this channel.
            } else {
              done = false;            // more calls pending to be sent.
            }
          } else {
            //如果还有数据需要处理,把call重新加入responseQueue中
            call.connection.responseQueue.addFirst(call);
            
            if (inHandler) {
              // set the serve time when the response has to be sent later
              call.timestamp = Time.now();
              incPending();
              try {
                // Wakeup the thread blocked on select, only then can the call 
                // to channel.register() complete.
                writeSelector.wakeup();
                channel.register(writeSelector, SelectionKey.OP_WRITE, call); //注册写时间到writeSelector中,在run方法中会后续处理
              } catch (ClosedChannelException e) {
                //Its ok. channel might be closed else where.
                done = true;
              } finally {
                decPending();
              }
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                  + " Wrote partial " + numBytes + " bytes.");
            }
          }
          error = false;              // everything went off well
        }
      } finally {
        if (error && call != null) {
          LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
          done = true;               // error. no more data for this channel.
          closeConnection(call.connection);
        }
      }
      return done;
    }

Server.Responder.run()方法

 @Override
    public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
        doRunLoop(); //调用方法
    }
 
    private void doRunLoop() {
      long lastPurgeTime = 0;   // last check for old calls.
      while (running) {
        try {
          waitPending();     // If a channel is being registered, wait.
          writeSelector.select(PURGE_INTERVAL);
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            try {
              if (key.isValid() && key.isWritable()) {
                  doAsyncWrite(key); //底层调用Server.Responder.processResponse()方法。对数据进行输出。
              }
            } 
          }
          long now = Time.now();
          if (now < lastPurgeTime + PURGE_INTERVAL) {
            continue;
          }
          lastPurgeTime = now;
          //
          // If there were some calls that have not been sent out for a
          // long time, discard them.
          //
          if(LOG.isDebugEnabled()) {
            LOG.debug("Checking for old call responses.");
          }
          ArrayList<Call> calls;
          
          // get the list of channels from list of keys.
          synchronized (writeSelector.keys()) {
            calls = new ArrayList<Call>(writeSelector.keys().size());
            iter = writeSelector.keys().iterator();
            while (iter.hasNext()) {
              SelectionKey key = iter.next();
              Call call = (Call)key.attachment();
              if (call != null && key.channel() == call.connection.channel) { 
                calls.add(call);
              }
            }
          }
          
          for(Call call : calls) {
            doPurge(call, now);
          }
        } catch (OutOfMemoryError e) {
        }
      }

对结果返回的后续操作参考了 参考链接博客

展开阅读全文
打赏
1
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
1
分享
返回顶部
顶部