hadoop 远程调度(三)

原创
2017/05/04 11:40
阅读数 106

简介

前几篇博客介绍了NIO和hadoop rpc的主要流程。本文主要介绍hadoop rpc server端接受到请求怎么处理,怎样返回。

rpc 服务端提供服务的大致流程

  1. rpc server端在接受到客户端请求后,会解析接受到的参数,获取需要执行的类,接口,方法,参数等信息
  2. 根据接口获取启动server时注入的实现类的实例
  3. 拿到实例后通过反射执行实例的对应方法。然后返回执行的结果到客户端

查看本文前需要先了解NIO的相关知识,这部分用到了大量的NIO知识。

rpc server 服务

上一篇博客说道server.start()方法。启动了两个线程,分别是:listener 和 responder

  • listener 监听rpc端口,处理rpc事件
  • responder 处理执行结果的返回

下边先看看listener 调用start 后具体做了什么

Server.Listener的构造方法

作用: 构建一个listener。 内部主要打来一个ServerSocketChannel通道,同时创建读线程,注册accept事件,并且设置本线程为守护线程。

 public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
     //创建一个服务通道。并且设置成非阻塞状态。
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
        //根据参数创建一个用来读取请求数据的多个线程
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }
      // 注册accept事件到通道中。
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

Server.Listener.run()方法

    @Override
    public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      connectionManager.startIdleScan();
      while (running) {
        SelectionKey key = null;
        try {
           //获取构造函数中的selector ,并且开始等待事件,处于阻塞状态,当有事件到达时才会向下处理。
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                //处理accept事件。
                  doAccept(key);
              }
        //省略代码
    }

Server.Listener.doAccept()

这个方法中主要构建了一个channel和一个connection实例,同时把connection实例添加到正在运行的reader线程,让reader线程处理请求。

   void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        //设置channel为非阻塞,立刻返回为null
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        channel.socket().setKeepAlive(true);
        //获取一个reader线程。初始化在Listener的构造函数中。
        Reader reader = getReader();
        //register内部主要是创建一个connection对象。同时把这个对象添加到集合中,返回conn
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        //把conn添加到reader线程padding队列中,reader线程会自动处理后续事件
        reader.addConnection(c);
      }
    }

到这里listener的accept请求事件结束,后面会接受read事件。下面我们看看Server.Listener.Reader线程中到底怎么处理这些read事件的。

Server.Listener.Reader

    //保存等待处理的链接
      final private BlockingQueue<Connection> pendingConnections;
      private final Selector readSelector; //本线程的一个通道管理器
    //简单的构造函数
      Reader(String name) throws IOException {
        super(name);
        this.pendingConnections =
            new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
        this.readSelector = Selector.open();
      }
      @Override
      public void run() {
           //主要调度这个函数   
          doRunLoop();
      }
      private synchronized void doRunLoop() {
        while (running) {
          SelectionKey key = null;
          try {
            //获取当前正在等待处理的链接,并且把read事件注册到当前线程的readerSelecter事件中
            int size = pendingConnections.size();
            for (int i=size; i>0; i--) {
              Connection conn = pendingConnections.take();
              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
            }
            readSelector.select(); //开始监听事件
            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              if (key.isValid()) {
                if (key.isReadable()) {
                  doRead(key); //通过doRead方法处理读事件。
                }
              }
            }
        }
      }

Server.Listener.doRead()方法主要作用

  1. 根据key 获取 对应的connection
  2. 调用connection的 readAndProcess()方法

Server.Connection.readAndProcess()方法

  1. 获取请求header ,判断请求类型
  2. 从channel中读取请求数据到缓冲区
  3. 调用processOneRpc方法处理数据

Server.Contection.processOneRpc()

  1. 从缓冲区中获取数据构建数据流
  2. 获取请求的header
  3. 调用processRpcRequest(header,dis)方法

Server.Contection.processRpcRequest()

  1. 获取RPC请求类型,获取对应的处理类
  2. 构造一个Call类来处理请求
  3. 把Call放入callQueue中,方便后续handle线程处理请求。

上边这部分内容各种判读,我也没看太懂,暂时忽略吧,各种判断有点头疼

展开阅读全文
打赏
1
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
1
分享
返回顶部
顶部