文档章节

Java NIO——Selector机制源码分析

草根本色
 草根本色
发布于 2015/10/29 08:27
字数 1384
阅读 82
收藏 1

一直不明白pipe是如何唤醒selector的,所以又去看了jdk的源码(openjdk下载),整理了如下:

以Java nio自带demo : OperationServer.java   OperationClient.java(见附件)

其中server端的核心代码:

public void initSelector() {
        try {
            selector = SelectorProvider.provider().openSelector();
            this.serverChannel1 = ServerSocketChannel.open();
            serverChannel1.configureBlocking(false);
            InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);
            serverChannel1.socket().bind(isa);
            serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
}

从头开始,

先看看SelectorProvider.provider()做了什么:

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;

if (provider != nullreturn provider;

保证了整个server程序中只有一个WindowsSelectorProvider对象;

再看看WindowsSelectorProvider. openSelector():

public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
new WindowsSelectorImpl(SelectorProvider)代码:
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        wakeupPipe = Pipe.open();
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

        // Disable the Nagle algorithm so that the wakeup is more immediate
        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
        (sink.sc).socket().setTcpNoDelay(true);
        wakeupSinkFd = ((SelChImpl)sink).getFDVal();

        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
    }

其中Pipe.open()是关键,这个方法的调用过程是:

public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
}
SelectorProvider 中:
public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
}

再看看怎么new PipeImpl()的:

其中IOUtil.makePipe(true)是个native方法:

/**
     * Returns two file descriptors for a pipe encoded in a long.
     * The read end of the pipe is returned in the high 32 bits,
     * while the write end is returned in the low 32 bits.
     */
staticnativelong makePipe(boolean blocking);
//具体实现
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
    int fd[2];

    if (pipe(fd) < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
        return 0;
    }
    if (blocking == JNI_FALSE) {
        if ((configureBlocking(fd[0], JNI_FALSE) < 0)
            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {
            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
            close(fd[0]);
            close(fd[1]);
            return 0;
        }
    }
    return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
static int
configureBlocking(int fd, jboolean blocking)
{
    int flags = fcntl(fd, F_GETFL);
    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);

    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

正如这段注释:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位处理。

 

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

这行代码把返回的pipe的write端的FD放在了pollWrapper中(后面会发现,这么做是为了实现selector的wakeup())

 

ServerSocketChannel.open()的实现:

可见创建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {        
super(sp);        
this.fd =  Net.serverSocket(true);    //打开一个socket,返回FD
        
 this.fdVal = IOUtil.fdVal(fd);        
 this.state = ST_INUSE;
}

然后通过serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel绑定在一起,也就是把new ServerSocketChannel时创建的FD与selector绑定在了一起。

到此,server端已启动完成了,主要创建了以下对象:

WindowsSelectorProvider:单例

WindowsSelectorImpl中包含:

    pollWrapper:保存selector上注册的FD,包括pipe的write端FD和ServerSocketChannel所用的FD

    wakeupPipe:通道(其实就是两个FD,一个read,一个write)

 

再到Server 中的run():

selector.select();主要调用了WindowsSelectorImpl中的这个方法:

protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        this.timeout = timeout; // set selector timeout
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();
        int updated = updateSelectedKeys();
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket();
        return updated;
    }

 其中subSelector.poll()是核心,也就是轮训pollWrapper中保存的FD;具体实现是调用native方法poll0:

这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。比如,由于 pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket发一份数据到 ServerSocket,那么poll0()就会返回;又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的 write端向FD发一份数据,也会造成poll0()返回;如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是 selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,所有在 OperationServer的run()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();

这时再来看看WindowsSelectorImpl. Wakeup():

public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {
                setWakeupSocket();
                interruptTriggered = true;
            }
        }
        return this;
    }
// Sets Windows wakeup socket to a signaled state.
    private void setWakeupSocket() {
        setWakeupSocket0(wakeupSinkFd);
    }
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}

可见wakeup()是通过pipe的write 端send(scoutFd, &byte, 1, 0),发生一个字节1,来唤醒poll()。所以在需要的时候就可以调用selector.wakeup()来唤醒selector。

原文:http://goon.iteye.com/blog/1775421

 

补充linux操作系统下的DefaultSelectorProvider的实现,可以看到,如果内核版本>=2.6则,具体的 SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider

//sun.nio.ch.DefaultSelectorProvider

public static SelectorProvider create() {
PrivilegedAction pa = new GetPropertyAction("os.name");
String osname = (String) AccessController.doPrivileged(pa);
    if ("SunOS".equals(osname)) {
        return new sun.nio.ch.DevPollSelectorProvider();
    }
 
    // use EPollSelectorProvider for Linux kernels >= 2.6
    if ("Linux".equals(osname)) {
        pa = new GetPropertyAction("os.version");
        String osversion = (String) AccessController.doPrivileged(pa);
        String[] vers = osversion.split("\\.", 0);
        if (vers.length >= 2) {
            try {
                int major = Integer.parseInt(vers[0]);
                int minor = Integer.parseInt(vers[1]);
                if (major > 2 || (major == 2 && minor >= 6)) {
                    return new sun.nio.ch.EPollSelectorProvider();
                }
            } catch (NumberFormatException x) {
                // format not recognized
            }
        }
    }
 
    return new sun.nio.ch.PollSelectorProvider();
}


1)Windows下,Selector.open()会自己和自己建立两条TCP链接。不但消耗了两个TCP连接和端口,同时也消耗了文件描述符。

2)Linux下,Selector.open()会自己和自己建两条管道。同样消耗了两个系统的文件描述符。


本文转载自:http://www.cnblogs.com/davidwang456/p/3831617.html

草根本色
粉丝 1
博文 24
码字总数 4631
作品 0
海淀
程序员
私信 提问
Qzone 微信 Java高级——dubbo源码分析之远程通信 netty

Java高级——dubbo源码分析之远程通信 netty dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dubbo 支持以下网络通信框架: Netty(默认) Min...

Java架构师那些事
2018/08/29
0
0
Java NIO原理 图文分析及代码实现

Java NIO原理图文分析及代码实现 前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术...

囚兔
2015/04/29
312
1
Java NIO原理图文分析及代码实现

前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:http://baik...

SunnyWu
2014/11/05
626
1
Java NIO原理图文分析及代码实现

Java IO 在Client/Server模型中,Server往往需要同时处理大量来自Client的访问请求,因此Server端需采用支持高并发访问的架构。一种简单而又直接的解决方案是“one-thread-per-connection”。...

只想一个人静一静
2014/02/22
285
2
Java NIO原理图文分析及代码实现

前言: 最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。可以参考:http://baik...

phacks
2015/08/19
137
0

没有更多内容

加载失败,请刷新页面

加载更多

Django设置启动环境

1.命令行指定环境 我们知道正常的django项目都是有多个环境的配置的,在启动时需要指定启动环境,如: python manage.py runserver --settings=myapp.settings.test 其中的myapp.settings....

恒宝乐园
24分钟前
5
0
DRF 获取DefaultRouter 对应的url

命令 python manage.py show_urls urls.py from user.router import core_routerurlpatterns = [ path('user/login/', views.LoginView.as_view(), name='login'), path('user/log......

hyhlinux
57分钟前
4
0
uniapp登录流程详解uni.login

uni.login(OBJECT) 登录 H5平台登陆注意事项: 微信内嵌浏览器运行H5版时,可通过js sdk实现微信登陆,需要引入一个单独的js,详见 普通浏览器上实现微信登陆,并非开放API,需要向微信申请,...

达达前端小酒馆
57分钟前
5
0
目标检测中 yolo 的mAP是什么含义?

mAP定义及相关概念 P => precision,即 准确率 R => recall,即 召回率 PR曲线 = >即 以 precision 和 recall 作为 纵、横轴坐标 的二维曲线。一般来说,precision 和 recall 是 鱼与熊掌 的...

小松1
今天
4
0
用jdk1.8的断言来做非空判断

Assert.notNull(user, "没有获得登录用户信息"); 看源码如下: public static void notNull(Object object, String message) { if (object == null) { throw new IllegalArgum......

architect刘源源
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部