文档章节

Reactor模型详解

爱宝贝丶
 爱宝贝丶
发布于 03/11 11:22
字数 2602
阅读 1034
收藏 29

       对于Java IO模型的变化,描述最为清楚的莫属于Doug Lea对Reactor模型的讲解《Scalable IO in Java》。本文则主要围绕该文档,对Java IO模型的演变过程进行讲解,并且会讲解各个模型所解决的问题以及其存在的问题。最后,本文也会以一个实际的例子来实现Reactor模型。

1. 传统IO模型

       对于传统IO模型,其主要是一个Server对接N个客户端,在客户端连接之后,为每个客户端都分配一个执行线程。如下图是该模型的一个演示:

image.png

       从图中可以看出,传统IO的特点在于:

  • 每个客户端连接到达之后,服务端会分配一个线程给该客户端,该线程会处理包括读取数据,解码,业务计算,编码,以及发送数据整个过程;
  • 同一时刻,服务端的吞吐量与服务器所提供的线程数量是呈线性关系的。

       这种设计模式在客户端连接不多,并发量不大的情况下是可以运行得很好的,但是在海量并发的情况下,这种模式就显得力不从心了。这种模式主要存在的问题有如下几点:

  • 服务器的并发量对服务端能够创建的线程数有很大的依赖关系,但是服务器线程却是不能无限增长的;
  • 服务端每个线程不仅要进行IO读写操作,而且还需要进行业务计算;
  • 服务端在获取客户端连接,读取数据,以及写入数据的过程都是阻塞类型的,在网络状况不好的情况下,这将极大的降低服务器每个线程的利用率,从而降低服务器吞吐量。

2. Reactor事件驱动模型

       在传统IO模型中,由于线程在等待连接以及进行IO操作时都会阻塞当前线程,这部分损耗是非常大的。因而jdk 1.4中就提供了一套非阻塞IO的API。该API本质上是以事件驱动来处理网络事件的,而Reactor是基于该API提出的一套IO模型。如下是Reactor事件驱动模型的示意图:

image.png

       从图中可以看出,在Reactor模型中,主要有四个角色:客户端连接,Reactor,Acceptor和Handler。这里Acceptor会不断地接收客户端的连接,然后将接收到的连接交由Reactor进行分发,最后有具体的Handler进行处理。改进后的Reactor模型相对于传统的IO模型主要有如下优点:

  • 从模型上来讲,如果仅仅还是只使用一个线程池来处理客户端连接的网络读写,以及业务计算,那么Reactor模型与传统IO模型在效率上并没有什么提升。但是Reactor模型是以事件进行驱动的,其能够将接收客户端连接,网络读和网络写,以及业务计算进行拆分,从而极大的提升处理效率;
  • Reactor模型是异步非阻塞模型,工作线程在没有网络事件时可以处理其他的任务,而不用像传统IO那样必须阻塞等待。

3. Reactor模型----业务处理与IO分离

       在上面的Reactor模型中,由于网络读写和业务操作都在同一个线程中,在高并发情况下,这里的系统瓶颈主要在两方面:

  • 高频率的网络读写事件处理;
  • 大量的业务操作处理;

       基于上述两个问题,这里在单线程Reactor模型的基础上提出了使用线程池的方式处理业务操作的模型。如下是该模型的示意图:

image.png

       从图中可以看出,在多线程进行业务操作的模型下,该模式主要具有如下特点:

  • 使用一个线程进行客户端连接的接收以及网络读写事件的处理;
  • 在接收到客户端连接之后,将该连接交由线程池进行数据的编解码以及业务计算。

       这种模式相较于前面的模式性能有了很大的提升,主要在于在进行网络读写的同时,也进行了业务计算,从而大大提升了系统的吞吐量。但是这种模式也有其不足,主要在于:

  • 网络读写是一个比较消耗CPU的操作,在高并发的情况下,将会有大量的客户端数据需要进行网络读写,此时一个线程将不足以处理这么多请求。

4. Reactor模型----并发读写

       对于使用线程池处理业务操作的模型,由于网络读写在高并发情况下会成为系统的一个瓶颈,因而针对该模型这里提出了一种改进后的模型,即使用线程池进行网络读写,而仅仅只使用一个线程专门接收客户端连接。如下是该模型的示意图:

image.png

       可以看到,改进后的Reactor模型将Reactor拆分为了mainReactor和subReactor。这里mainReactor主要进行客户端连接的处理,处理完成之后将该连接交由subReactor以处理客户端的网络读写。这里的subReactor则是使用一个线程池来支撑的,其读写能力将会随着线程数的增多而大大增加。对于业务操作,这里也是使用一个线程池,而每个业务请求都只需要进行编解码和业务计算。通过这种方式,服务器的性能将会大大提升,在可见情况下,其基本上可以支持百万连接。

5. Reactor模型示例

       对于上述Reactor模型,服务端主要有三个角色:Reactor,Acceptor和Handler。这里基于Doug Lea的文档对其进行了实现,如下是Reactor的实现代码:

public class Reactor implements Runnable {
  private final Selector selector;
  private final ServerSocketChannel serverSocket;

  public Reactor(int port) throws IOException {
    serverSocket = ServerSocketChannel.open();  // 创建服务端的ServerSocketChannel
    serverSocket.configureBlocking(false);  // 设置为非阻塞模式
    selector = Selector.open();  // 创建一个Selector多路复用器
    SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    serverSocket.bind(new InetSocketAddress(port));  // 绑定服务端端口
    key.attach(new Acceptor(serverSocket));  // 为服务端Channel绑定一个Acceptor
  }

  @Override
  public void run() {
    try {
      while (!Thread.interrupted()) {
        selector.select();  // 服务端使用一个线程不断等待客户端的连接到达
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          dispatch(iterator.next());  // 监听到客户端连接事件后将其分发给Acceptor
          iterator.remove();
        }

        selector.selectNow();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private void dispatch(SelectionKey key) throws IOException {
    // 这里的attachement也即前面为服务端Channel绑定的Acceptor,调用其run()方法进行
    // 客户端连接的获取,并且进行分发
    Runnable attachment = (Runnable) key.attachment();
    attachment.run();
  }
}

       这里Reactor首先开启了一个ServerSocketChannel,然后将其绑定到指定的端口,并且注册到了一个多路复用器上。接着在一个线程中,其会在多路复用器上等待客户端连接。当有客户端连接到达后,Reactor就会将其派发给一个Acceptor,由该Acceptor专门进行客户端连接的获取。下面我们继续看一下Acceptor的代码:

public class Acceptor implements Runnable {
  private final ExecutorService executor = Executors.newFixedThreadPool(20);

  private final ServerSocketChannel serverSocket;

  public Acceptor(ServerSocketChannel serverSocket) {
    this.serverSocket = serverSocket;
  }

  @Override
  public void run() {
    try {
      SocketChannel channel = serverSocket.accept();  // 获取客户端连接
      if (null != channel) {
        executor.execute(new Handler(channel));  // 将客户端连接交由线程池处理
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

       这里可以看到,在Acceptor获取到客户端连接之后,其就将其交由线程池进行网络读写了,而这里的主线程只是不断监听客户端连接事件。下面我们看看Handler的具体逻辑:

public class Handler implements Runnable {
  private volatile static Selector selector;
  private final SocketChannel channel;
  private SelectionKey key;
  private volatile ByteBuffer input = ByteBuffer.allocate(1024);
  private volatile ByteBuffer output = ByteBuffer.allocate(1024);

  public Handler(SocketChannel channel) throws IOException {
    this.channel = channel;
    channel.configureBlocking(false);  // 设置客户端连接为非阻塞模式
    selector = Selector.open();  // 为客户端创建一个新的多路复用器
    key = channel.register(selector, SelectionKey.OP_READ);  // 注册客户端Channel的读事件
  }

  @Override
  public void run() {
    try {
      while (selector.isOpen() && channel.isOpen()) {
        Set<SelectionKey> keys = select();  // 等待客户端事件发生
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();

          // 如果当前是读事件,则读取数据
          if (key.isReadable()) {
            read(key);
          } else if (key.isWritable()) {
           // 如果当前是写事件,则写入数据
            write(key);
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  // 这里处理的主要目的是处理Jdk的一个bug,该bug会导致Selector被意外触发,但是实际上没有任何事件到达,
  // 此时的处理方式是新建一个Selector,然后重新将当前Channel注册到该Selector上
  private Set<SelectionKey> select() throws IOException {
    selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    if (keys.isEmpty()) {
      int interestOps = key.interestOps();
      selector = Selector.open();
      key = channel.register(selector, interestOps);
      return select();
    }

    return keys;
  }

  // 读取客户端发送的数据
  private void read(SelectionKey key) throws IOException {
    channel.read(input);
    if (input.position() == 0) {
      return;
    }

    input.flip();
    process();  // 对读取的数据进行业务处理
    input.clear();
    key.interestOps(SelectionKey.OP_WRITE);  // 读取完成后监听写入事件
  }

  private void write(SelectionKey key) throws IOException {
    output.flip();
    if (channel.isOpen()) {
      channel.write(output);  // 当有写入事件时,将业务处理的结果写入到客户端Channel中
      key.channel();
      channel.close();
      output.clear();
    }
  }
    
  // 进行业务处理,并且获取处理结果。本质上,基于Reactor模型,如果这里成为处理瓶颈,
  // 则直接将其处理过程放入线程池即可,并且使用一个Future获取处理结果,最后写入客户端Channel
  private void process() {
    byte[] bytes = new byte[input.remaining()];
    input.get(bytes);
    String message = new String(bytes, CharsetUtil.UTF_8);
    System.out.println("receive message from client: \n" + message);

    output.put("hello client".getBytes());
  }
}

       在Handler中,主要进行的就是为每一个客户端Channel创建一个Selector,并且监听该Channel的网络读写事件。当有事件到达时,进行数据的读写,而业务操作这交由具体的业务线程池处理。

6. 小结

       本文首先讲解了Java IO的几种网络模型,着重比较了每种不同的模型的优缺点,并且基于Reactor模型,使用一个实例对其实现过程进行了演示。

© 著作权归作者所有

爱宝贝丶

爱宝贝丶

粉丝 340
博文 136
码字总数 456051
作品 0
武汉
程序员
私信 提问
收藏的博客 -- 高性能Linux服务器

http://zhuanlan.51cto.com/columnlist/shenj/ --- 58沈剑 http://blog.csdn.net/analogouslove --- 范蠡&张小方 http://blog.csdn.net/column/details/15700.html --- teamtalk https://gi......

libaineu2004
2017/08/08
0
0
新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析

1、引言 Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。 本文基于 Netty 4.1 展开介绍相关理论模型,使用场景,基本组件、...

JackJiang2011
2018/11/05
0
0
Proactor模式&Reactor模式详解

服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种: (1)同步阻塞IO(Blocking IO):即传统的IO模型。 (2)同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非...

follitude
2016/09/08
264
0
Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)

目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO...

永顺
2017/11/29
0
0
Netty与Reactor模式详解

在学习Reactor模式之前,我们需要对“I/O的四种模型”以及“什么是I/O多路复用”进行简单的介绍,因为Reactor是一个使用了同步非阻塞的I/O多路复用机制的模式。 I/O的四种模型 I/0 操作 主要...

hutaishi
2018/07/16
143
0

没有更多内容

加载失败,请刷新页面

加载更多

查看线上日志常用命令

cat 命令(文本输出命令) 通常查找出错误日志 cat error.log | grep 'nick' , 这时候我们要输出当前这个日志的前后几行: 显示file文件里匹配nick那行以及上下5行 cat error.log | grep -C ...

xiaolyuh
17分钟前
3
0
六、Java设计模式之工厂方法

工厂方法定义: 定义一个创建对象的接口,但让实现这个接口的类来决定实例化哪个类,工厂方法让类的实例化推迟到子类中进行 类型:创建型 工厂方法-使用场景: 创建对象需要大量重复的代码 ...

东风破2019
24分钟前
2
0
win服务器管理遇到的一系列问题记录

有些小伙伴在使用iis7远程桌面管理工具的时候总是会遇到一系列的问题,下面就是为大家介绍一下服务器日常管理过程中出现的问题及我的解决办法和心得。希望能帮到大家。   拒绝服务器重新启...

1717197346
31分钟前
2
0
flutter 剪切板 复制粘贴

复制粘贴功能 import 'package:flutter/services.dart'; Clipboard.setData(ClipboardData(text:_text));Clipboard.getData;...

zdglf
33分钟前
3
0
如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?

面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是前面说的重复消费...

米兜
34分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部