文档章节

java nio模型理解

红发-
 红发-
发布于 2017/03/28 11:50
字数 864
阅读 48
收藏 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

 

1、tcp信道,具体参数详情参考api

ServerSocketChannel:创建、接收、关闭、读写、阻塞

SocketChannel:创建、连接、关闭、读写、阻塞(测试连接性)

2、Selector:创建、关闭选择器

案例一:

NIOAccepter服务端线程

package com.warehouse.data.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

import javax.imageio.IIOException;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 9:55
 **/
public class NIOAcceptor extends Thread {
    private Selector selector;
    private ServerSocketChannel channel;
    private NIOReactorPool reactorPool;


    public NIOAcceptor(String name, String host, int port, NIOReactorPool reactorPool) throws IOException {
        super(name);
        this.reactorPool = reactorPool;
        //获取一个管理通道器
        this.selector = Selector.open();
        //获取一个接受连接socket
        this.channel = ServerSocketChannel.open();
        //设置非阻塞
        this.channel.configureBlocking(false);
        //绑定端口
        this.channel.bind(new InetSocketAddress(host, port));
        //注册事件
        this.channel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("start NIOAcceptor thread server.");
    }


    @Override
    public void run() {
        final Selector selector = this.selector;
        //轮询
        for (; ; ) {
            try {
                //阻塞,直到select事件到达
                selector.select(1000L);
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                try {
                    for (SelectionKey key : selectionKeySet) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept(selector);
                        }/* else if (key.isValid() && key.isReadable()) {
                            Processor processor = (Processor) key.attachment();
                            try{
                                processor.process(key);
                            }catch (IOException e){
                                processor.close();
                            }
                        } */else {
                            key.cancel();
                        }
                    }
                } finally {
                    selectionKeySet.clear();
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }


    public void accept(Selector selector) {
        SocketChannel socketChannel = null;
        try {
            System.out.println("accept client success.");
            socketChannel = this.channel.accept();
            socketChannel.configureBlocking(false);

            //单个Reactor线程处理
            //SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            //selectionKey.attach(new Processor(socketChannel));
            //多个Reactor线程处理
            NIOReactor reactor = this.reactorPool.getNextReactor();
            reactor.postRegister(new Processor(socketChannel));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

NIOReactorPool线程池

package com.warehouse.data.nio;

import java.io.IOException;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:50
 **/
public class NIOReactorPool {

    private final NIOReactor[] nioReactors;
    private volatile int nextReactor;

    public NIOReactorPool(String name, int poolSize) throws IOException {
        nioReactors = new NIOReactor[poolSize];
        for (int i = 0; i < poolSize; i++) {
            NIOReactor nioReactor = new NIOReactor(name + "-" + i);
            nioReactors[i] = nioReactor;
            nioReactor.startup();
        }
    }


    public NIOReactor getNextReactor() {
        int i = ++nextReactor;
        if (i > nioReactors.length) {
            i = nextReactor = 0;
        }
        return nioReactors[i];
    }
}

 

NIOReactor线程

package com.warehouse.data.nio;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:51
 **/
public final class NIOReactor {
    private final String name;
    private final RWThread reactorR;

    public NIOReactor(String name) throws IOException {
        this.name = name;
        this.reactorR = new RWThread();
    }

    public void startup() {
        new Thread(reactorR, this.name + "-RW").start();
    }

    public void postRegister(Processor processor) {
        this.reactorR.registerQueue.offer(processor);
        this.reactorR.selector.wakeup();
    }

    private final class RWThread extends Thread {
        private final Selector selector;
        private final ConcurrentLinkedQueue<Processor> registerQueue;

        public RWThread() throws IOException {
            this.selector = Selector.open();
            this.registerQueue = new ConcurrentLinkedQueue<Processor>();
        }

        @Override
        public void run() {
            final Selector selector = this.selector;
            Set<SelectionKey> selectionKeySet = null;

            for (; ; ) {
                try {
                    selector.select(1000L);
                    register(selector);
                    selectionKeySet = selector.selectedKeys();
                    for (SelectionKey key : selectionKeySet) {
                        Object att = key.attachment();
                        Processor processor = null;
                        try {
                            if (att != null && key.isValid()) {
                                processor = (Processor) att;
                                if (key.isReadable()) {
                                    processor.process(key);
                                }
                                if (key.isWritable()) {

                                }
                            } else {
                                key.channel();
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                            if(processor != null){
                                processor.close();
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (selectionKeySet != null) {
                        selectionKeySet.clear();
                    }
                }
            }
        }


        private void register(Selector selector) {
            if (this.registerQueue.isEmpty()) {
                return;
            }
            Processor processor = null;
            while ((processor = this.registerQueue.poll()) != null) {
                try {
                    processor.register(selector);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        }
    }


}

 

Processor事件处理

 

package com.warehouse.data.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 10:49
 **/
public class Processor {
    private SocketChannel channel;
    private SelectionKey selectionKey;

    public Processor(SocketChannel channel) {
        this.channel = channel;
    }

    public void register(Selector selector) throws ClosedChannelException {
        selectionKey = this.channel.register(selector, SelectionKey.OP_READ, this);
    }

    public void process(SelectionKey key) throws IOException {
        //可以采用线程池处理
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int count = socketChannel.read(byteBuffer);
        System.out.println(new String(byteBuffer.array()));
    }



    public void close(){
        if(this.channel != null){
            try {
                this.channel.close();
                this.selectionKey.cancel();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Server服务端

 

package com.warehouse.data.nio;

import java.io.IOException;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:03
 **/
public class Server {

    public static void main(String[] args) throws IOException {
        //5个reactor线程
        NIOReactorPool nioReactorPool = new NIOReactorPool("NIOReactor-IO", 5);
        NIOAcceptor nioAcceptor = new NIOAcceptor("NIOAcceptor-IO", "127.0.0.1", 8888, nioReactorPool);
        nioAcceptor.start();


    }
}

Client客户端

 

package com.warehouse.data.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:00
 **/
public class Client {


    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);


        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));


        if(socketChannel.isConnectionPending()){
            //要finishConnect,否则会出现NotYetConnectedException异常
            socketChannel.finishConnect();
            socketChannel.write(ByteBuffer.wrap(new String("hello world").getBytes()));
        }

    }
}

 

资料:具体可以参考netty权威指南

http://www.cnblogs.com/good-temper/p/5003892.html

 

 

© 著作权归作者所有

红发-
粉丝 3
博文 33
码字总数 39368
作品 0
长沙
私信 提问
Java NIO原理 图文分析及代码实现

Java NIO原理图文分析及代码实现 前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术...

囚兔
2015/04/29
318
1
Java NIO原理图文分析及代码实现

前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:http://baik...

SunnyWu
2014/11/05
632
1
Java NIO原理图文分析及代码实现

Java IO 在Client/Server模型中,Server往往需要同时处理大量来自Client的访问请求,因此Server端需采用支持高并发访问的架构。一种简单而又直接的解决方案是“one-thread-per-connection”。...

只想一个人静一静
2014/02/22
287
2
java NIO原理及通信模型

Java NIO是在jdk1.4开始使用的,它既可以说成“新IO”,也可以说成非阻塞式I/O。下面是java NIO的工作原理: 由一个专门的线程来处理所有的IO事件,并负责分发。 事件驱动机制:事件到的时候...

柳哥
2015/02/15
5.7K
6
Java NIO原理图文分析及代码实现

前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:http://baik...

phacks
2015/08/19
137
0

没有更多内容

加载失败,请刷新页面

加载更多

用于电话号码验证的综合正则表达式

我正在尝试综合使用正则表达式来验证电话号码。 理想情况下,它将处理国际格式,但必须处理美国格式,包括以下内容: 1-234-567-8901 1-234-567-8901 x1234 1-234-567-8901 ext1234 1(234)...

javail
40分钟前
6
0
你为什么要使用表达 >而不是Func ?

我了解lambda和Func和Action代表。 但是表情让我难过。 在什么情况下,您将使用Expression<Func<T>>而不是普通的旧Func<T> ? #1楼 我想添加一些关于Func<T>和Expression<Func<T>>之间的区别......

技术盛宴
55分钟前
5
0
用最简单的方法实现原生JS放大镜特效

<html lang="en"><head> <meta charset="UTF-8"> <title>Document</title> <style> *{margin:0px;padding:0px;} #big{width:200p......

汤清丽
59分钟前
4
0
NIO 编程

1. NIO 比 传统 IO 有什么优点? 答:NIO 一个 线程 可以监听多个客户端,传统 客户端 -- 服务端模型中,一个线程监听一个客户端,导致线程消耗过多,一个线程要分配0.5M~1M内存。 2. NIO服务...

杨凯123
59分钟前
4
0
SpringBoot 系列教程自动配置选择生效

191214-SpringBoot 系列教程自动配置选择生效 写了这么久的 Spring 系列博文,发现了一个问题,之前所有的文章都是围绕的让一个东西生效;那么有没有反其道而行之的呢? 我们知道可以通过@Co...

小灰灰Blog
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部