文档章节

Java NIO之Reactor模式

秋风醉了
 秋风醉了
发布于 2014/10/30 20:17
字数 1161
阅读 768
收藏 1

Java NIO之Reactor模式

如下图所示,

Single Threaded Versioin指的是 Reactor 只有一个线程在处理 IO 事件,分发所有的IO事件,而具体的处理过程则是由Handler 去做。

那么一个Reactor系统只有一个Reactor,如果有100 个连接,那么就有100 个Handler 在处理。(看下面代码)

我就按我的理解说一下一次网络请求的过程:

1.如下面Reactor的构造方法,启动一个Reactor系统。

public Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(
            new InetSocketAddress(port));
    serverSocket.configureBlocking(false);
    SelectionKey sk =
            serverSocket.register(selector,
                    SelectionKey.OP_ACCEPT);
    //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
    sk.attach(new Acceptor());
    log.info("->attach(new Acceptor())");
}

启动的时候把当前的 serverSocket 注册到给定的selector,并且指明感兴趣的事件,SelectionKey.OP_ACCEPT,然后返回一个SelectionKey,这个key表示当前的channel 和 selector的映射关系。

2.如果现在有一个网络连接,如果网络的OP_ACCEPT事件发生,则调用selector.selectedKeys();会得到一个关于OP_ACCEPT事件的key,然后dispatch(sk);分发这个事件。通过key的attachment()方法得到附加的对象,这个对象是一个线程对象,也是Acceptor对象。在这里处理网络连接,得到客户端的socketchannel。

3.得到了客户端的socketchannel,就可以准备读写客户端的socketchannel了。先注册一个SelectionKey.OP_READ读事件。并且当前的Handler对象附加到key对象上sk.attach(this);。

MulitiHandler(Selector selector, SocketChannel c) throws IOException {
    socket = c;
    c.configureBlocking(false);
    // Optionally try first read now
    sk = socket.register(selector, 0);
    // 注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,
    // 下次该Handler又有READ事件发生时,
    // 将直接触发Handler.从而开始了数据的读->处理->写->发出等流程处理。
    sk.attach(this);
    sk.interestOps(SelectionKey.OP_READ);
    selector.wakeup();
}

4.当READ事件发生后,则会通过dispatch(sk);分发。通过Handler的run方法进行具体的IO的读操作。

5.读完了数据之后,注册OP_WRITE事件sk.interestOps(SelectionKey.OP_WRITE)。然后当该事件发生后,则分发该事件,调用Handler的run事件处理IO写操作。

如下代码示例,

package com.usoft;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Reactor implements Runnable {

    private static Logger log = LoggerFactory.getLogger(Reactor.class);

    final Selector selector;
    final ServerSocketChannel serverSocket;

    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
        sk.attach(new Acceptor());
        log.info("->attach(new Acceptor())");
    }


    // Alternatively,use explicit SPI provider :
    // SelectorProvider p = SelectorProvider.provider();
    // selector=p.openSelector();
    // serverSocket=p.openServerSocketChannel();

    // class Reactor continued
    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (it.hasNext()) {
                    //来一个事件 第一次触发一个accepter线程
                    //以后触发Handler
                    SelectionKey sk = (SelectionKey) it.next();
                    log.info(">>>>>>acceptable=" + sk.isAcceptable() +
                            ",readable=" + sk.isReadable() +
                            ",writable=" + sk.isWritable());
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (IOException ex) {
            log.info("reactor stop!" + ex);
        }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    // class Reactor continued
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                log.debug("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) { /* . . . */ }
        }
    }
}
package com.usoft;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

final class Handler implements Runnable {
    private static Logger log = LoggerFactory.getLogger(Reactor.class);

    static final int MAXIN = 1024;
    static final int MAXOUT = 1024;
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(selector, 0);
        // 注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,
        // 下次该Handler又有READ事件发生时,
        // 将直接触发Handler.从而开始了数据的读->处理->写->发出等流程处理。
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete() {
        return true; //只是返回true,具体的判断没有实现
    }

    boolean outputIsComplete() {
        return true;//只是返回true,具体的判断没有实现
    }

    void process() { //没有具体实现
        output.put("helloworld".getBytes());
    }

    // class Handler continued
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* . . . */ }
    }

    void read() throws IOException {
        log.info("->read into bytebuffer from socketchannel inputs");
        socket.read(input);
        if (inputIsComplete()) {
            log.info("->read complete");
            process();
            state = SENDING;
            // Normally also do first write now
            // 读完了数据之后,注册OP_WRITE事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        log.info("->write into socketchannel from bytebuffer outputs");
        socket.write(output);
        if (outputIsComplete()) {
            /**
             * The key will be removed fromall of the selector's key sets
             * during the next selection operation.
             */
            sk.cancel();
            socket.close(); //关闭通过,也就关闭了连接
            log.info("->close socketchannel after write complete");
        }
    }
}

ReactorTest.java

package com.usoft;

import java.io.IOException;

/**
 * Created by liyanxin on 2015/3/23.
 */
public class ReactorTest {

    public static void main(String args[]) throws IOException {
        Reactor reactor = new Reactor(9098);
        reactor.run();
    }
}

参考:http://www.jdon.com/concurrent/reactor.htm

=============END=============

本文转载自:http://ifeve.com/netty-reactor-4/

上一篇: MySQL连接查询
下一篇: Java哈希表
秋风醉了
粉丝 250
博文 536
码字总数 408466
作品 0
朝阳
程序员
私信 提问
Reactor和Proactor模式

在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。 同步和异步 同步和异步是针对应用程序和内核的交互而言的,...

ksfzhaohui
2012/12/14
0
0
Netty那点事(四)Netty与Reactor模式

![Reactors][1] 一:Netty、NIO、多线程? 时隔很久终于又更新了!之前一直迟迟未动也是因为积累不够,后面比较难下手。过年期间@李林锋hw发布了一个Netty5.0架构剖析和源码解读 http://vdi...

黄亿华
2014/02/08
0
10
深入了解 Java-Netty高性能高并发理解

一丶 Netty基础入门 Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,...

架构师springboot
2018/10/31
0
0
Netty系列之Netty高性能之道

背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于J...

宇智波带土
2014/06/17
0
1
Java异步NIO框架Netty实现高性能高并发

背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于J...

java知识分子
2018/09/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

10分钟详解Spring全家桶7大知识点

点关注,不迷路;持续更新Java架构相关技术及资讯热文!!! Spring框架自诞生以来一直备受开发者青睐,有人亲切的称之为:Spring 全家桶。它包括SpringMVC、SpringBoot、Spring Cloud、Spr...

我最喜欢三大框架
22分钟前
4
0
注册服务

列出所有服务[root@localhost ~]# systemctl list-unit-files[root@localhost ~]# systemctl status mysqld[root@localhost ~]# systemctl stop mysqld[root@localhost ~]# ......

jxlgzwh
25分钟前
0
0
解决jdk8 stream tomap方法报错:java.lang.IllegalStateException: Duplicate key异常解决(key重复)

List<User> userList = User.ME.loadList(users); if (CollectionUtils.isNotEmpty(userList)) { Map<Long, User> userMap = userList.stream().filter(Objects::nonN......

冰峰雪座
34分钟前
0
0
jdk中的一些命令

jdk中的一些命令 jps jstack jmap jstat jhat jinfo javap http://www.importnew.com/18398.html

晨猫
35分钟前
1
0
Bystack的高TPS共识算法

共识算法是分布式系统保证节点数据状态一致性的方法,在区块链的共识算法分POW(工作量证明)和POS(权益证明)两大类。第一类POW模式是在公链项目中运用的最广泛应用的共识算法,比特币长达10年...

比原链Bytom
35分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部