NIO源码详解

原创
2018/11/13 11:53
阅读数 6.3K

阻塞io和无阻塞io
阻塞io是指jdk1.4之前版本面向流的io,服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程相应,如果没有则会一直等待或者遭到拒 绝请求,如果有的话,客户端会线程会等待请求结束后才继续执行。
当并发量大,而后端服务或客户端处理数据慢时就会产生产生大量线程处于等待中,即上述的阻塞。

 

无阻塞io是使用单线程或者只使用少量的多线程,每个连接共用一个线程,当处于等待(没有事件)的时候线程资源可以释放出来处理别的请求,通过事件驱动模型当有accept/read/write等事件发生后通知(唤醒)主线程分配资源来处理相关事件。java.nio.channels.Selector就是在该模型中事件的观察者,可以将多个SocketChannel的事件注册到一个Selector上,当没有事件发生时Selector处于阻塞状态,当SocketChannel有accept/read/write等事件发生时唤醒Selector。
 

 这个Selector是使用了单线程模型,主要用来描述事件驱动模型,要优化性能需要一个好的线程模型来使用,目前比较好的nio框架有Netty,apache的mina等。线程模型这块后面再分享,这里重点研究Selector的阻塞和唤醒原理。
退出阻塞的方式有:register在selector上的socketChannel处于就绪状态(放在pollArray中的socketChannel的FD就绪) 或者 第1节中放在pollArray中的wakeupSourceFd就绪。前者(socketChannel)就绪唤醒应证了文章开始的阻塞->事件驱动->唤醒的过程,后者(wakeupSourceFd)就是下面要看的主动wakeup。

这里创建了一个管道pipe,并对pipe的source端的POLLIN事件感兴趣,addWakeupSocket方法将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态。(事实上windows就是通过向管道中写数据来唤醒阻塞的选择器的)从以上代码可以看出:通道的打开实际上是构造了一个SelectorImpl对象

subSelector.poll() 是select的核心,由native函数poll0实现,readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其余位置存放发生事件的socket句柄fd

图像详解

1,Selector.open()

Pipe.open()打开一个管道 拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里; 上图中最下面那部分创建pipe的过程,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。
source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))

pollWrapper用Unsafe类申请一块物理内存,存放注册时的socket句柄fdVal和event的数据结构pollfd,其中pollfd共8字节,0~3字节保存socket句柄,4~7字节保存event

先了解一下Unsafe的基本操作

//分配var1字节大小的内存,返回起始地址偏移量 
 public native long allocateMemory(long var1);
//重新给var1起始地址的内存分配长度为var3字节大小的内存,返回新的内存起始地址偏移量
 public native long reallocateMemory(long var1, long var3);
 //释放起始地址为var1的内存
 public native void freeMemory(long var1);

pollWrapper申请了一个64个字节的对外内存空间,address为内存空间的开始地址

PollArrayWrapper pollWrapper = new PollArrayWrapper(8);
PollArrayWrapper(int var1) { int var2 = var1 * SIZE_POLLFD; 
this.pollArray = new AllocatedNativeObject(var2, true); 
this.pollArrayAddress = this.pollArray.address(); this.size = var1; }
protected NativeObject(int var1, boolean var2) {
    if(!var2) {
        this.allocationAddress = unsafe.allocateMemory((long)var1);
        this.address = this.allocationAddress;
    } else {
        int var3 = pageSize();
        long var4 = unsafe.allocateMemory((long)(var1 + var3));
        this.allocationAddress = var4;
        this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1));
    }

}

pollWrapper.addWakeupSocket(wakeupSourceFd, 0),把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里,本次操作pollfd会使用6个字节

void addWakeupSocket(int var1, int var2) {
    this.putDescriptor(var2, var1);
    this.putEventOps(var2, Net.POLLIN);
}
void putDescriptor(int var1, int var2) {
    this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}

void putEventOps(int var1, int var2) {
    this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}

实际上pollfd的结构

2,Channel.Register()

3,Selector.select()

4,SelectionKey.cancel()

   上图浅蓝色部分

网上找的一些辅助理解图片

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部