文档章节

netty-当一个客户端连接到来的时候发生了什么

 大胖和二胖
发布于 2016/07/12 16:32
字数 1809
阅读 53
收藏 0

在之前的文章当中,我们分析了ServerBootstrap.bind当中究竟干了些什么,并且找到了netty当中使用java nio的具体地方。今天的内容,重点讨论一下,当一个客户端要connect我们的server的时候,netty究竟具体做了哪些事情。

先看一下java nio的代码,这个当中做了这么几件事情,

selector.select -> 遍历 selector.selectedKeys()  -> 判断 key.isAcceptable()  -> accept并生成一个 SocketChannel 对象 -> config and register

            if(selector.select(TimeOut)==0){
                System.out.println(".");
                continue;
            }
            
            // 获得就绪信道的键迭代器
            Iterator<SelectionKey> keyIter=selector.selectedKeys().iterator();
            
            // 使用迭代器进行遍历就绪信道
            while(keyIter.hasNext()){
                System.out.println("Something happened ");
                SelectionKey key=keyIter.next();
                
                // 这种情况是有客户端连接过来,准备一个clientChannel与之通信
                if(key.isAcceptable()){
                    System.out.println("Accept in");
                    SocketChannel clientChannel=((ServerSocketChannel)key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocate(Buffer_Size));
                }

那么我们现在就看一下netty的代码,找一找在新连接到来的时候,netty具体做了些什么事情,而java nio又是具体在哪些地方出现的。

首先我们搞清楚一个问题,我们都知道每一个java对象,都会分配在heap当中,而如果它的引用的生命周期已经结束的情况下,它会在某一个时间点被jvm回收。那么之前在bind过程当中产生的一个NioServerSocketChannel对象,如果我们在回顾一下bind部分的代码,我们会发现,它的引用,统统都是在方法内部,以局部变量的形式存在的,在bind方法执行完毕之后,这个 NioServerSocketChannel 对象,又以什么形式出现呢?

我得出的结论是这样的,在 NioServerSocketChannel 生成之后,会为它分配一个NioEventLoop对象,之后会把 NioServerSocketChannel 注册到这个 NioEventLoop对象 当中的selector上,这样的话,相当于是 NioEventLoop对象 引用了这个NioServerSocketChannel 对象,而 NioEventLoop 是一直处于alive状态的。

那么下一个问题, NioEventLoop究竟是什么时候开始运行的呢?

我们找到了这个地方AbstractChannel$AbstractUnsafe.register这个方法,该方法用于register,看代码,首先判断当前eventLoop是否处在运行当中,如果不是的话,执行了它的execute方法。这个execute方法有2层含义,1、启动线程,2、添加一个task到该线程的task队列当中。(这部分的具体实现,我们以后再看)

if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            System.out.println("Tmp Task run");
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    promise.setFailure(t);
                }
            }

ok,到此为止,NioEventLoop已经处在运行状态了,那么我们去看看run当中都干了些什么。
在run方法当中,我们总结一下,其实就是2件事,1、我的本职工作,监控server channel的状态,发起下一步动作;2、完成临时添加的其他task。大家按照这个思路来看,就很容易理解了。之前的文章当中我们分析过,通过java reflect机制,已经成功的把selector的selectedkeys属性映射到了NioEventLoop的 selectedKeys变量当中了。

protected void run() {
        for (;;) {
            System.out.println("NioEventLoop run start");
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select();

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;

                final long ioStartTime = System.nanoTime();
                needsToSelectAgain = false;
                if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    processSelectedKeysPlain(selector.selectedKeys());
                }
                final long ioTime = System.nanoTime() - ioStartTime;

                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

我们直接来看这段 processSelectedKeysOptimized(selectedKeys.flip()); 这个方法当中,会遍历 selectedKeys ,然后执行相应的动作,对于一个server channel来说,我们期待的就是一个accept。看看代码。请注意红色部分,对于我们目前的场景,只关注if部分即可。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

之后关注 processSelectedKey(k, (AbstractNioChannel) a)的代码,还是关注红色部分,在有连接进来的时候,我们debug一下就可以知道, readyOps =16,对照一下selectionKey.OP_ACCEPT,我们知道这个时候需要server channel accept一个新的连接了。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

那么我们接下来看一下unsafe究竟是谁?而unsafe.read究竟干了些什么。首先我们看一下read方法的实现,来自2个class,NioMessageUnsafe和NioByteUnsafe,初步了解 NioMessageUnsafe 服务于NioServerSocketChannel,而 NioByteUnsafe 服务于NioSocketChannel,更详细的信息,我们以后再研究。找到unsafe.read的代码,AbstractNioMessageChannel$NioMessageUnsafe.read(),代码很多,我们仅关注红色部分。

public void read() {
            assert eventLoop().inEventLoop();
            if (!config().isAutoRead()) {
                removeReadOp();
            }

            final ChannelConfig config = config();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            final boolean autoRead = config.isAutoRead();
            final ChannelPipeline pipeline = pipeline();
            boolean closed = false;
            Throwable exception = null;
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
                        break;
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                if (exception instanceof IOException) {
                    // ServerChannel should not be closed even on IOException because it can often continue
                    // accepting incoming connections. (e.g. too many open files)
                    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                }

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        }

首先是 doReadMessages , 代码在NioServerSocketChannel当中,很明确,accept并生成一个 java nio的Socketchannel对象 ,然后new一个 NioSocketChannel 对象,把 NioServerSocketChannel 自己、一个eventloop、还有一个java nio的Socketchannel对象,放到了这个新的对象当中。之后加入到list当中,并返回1.

protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

然后是 pipeline.fireChannelRead(readBuf.get(i));注意2点,首先这里的pipeline是服务于Server channel的pipeline,通常情况下,当中并没有我们业务相关的handler,而只有一个netty默认为server channel服务的ServerBootstrapAcceptor。其次,传入的参数就是新的NioSocketChannel;我们看一下它的channelRead的代码,为新Channel添加handler,设置属性、register selector.

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            child.unsafe().register(child.newPromise());
        }

在之后,fire一个 ChannelReadComplete,这个当中涉及到autoread这个参数,我们会在后续详细讨论。

到此为止,新的channel已经生成,并且已经成功注册到了eventloop上。而在register的同时,新channel的eventloop已经开始运行了。

下面就等着客户端发消息过来了!

 

© 著作权归作者所有

粉丝 23
博文 69
码字总数 50842
作品 0
沈阳
架构师
私信 提问
netty EventLoop学习(二)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lupeng/article/details/82469975 EventLoop学习 昨天我开始学习netty框架,写了一个小例子,今天学习下其中比...

孤落
2018/09/06
0
0
Netty 心跳服务之 IdleStateHandler 源码分析

前言:Netty 提供的心跳介绍 Netty 作为一个网络框架,提供了诸多功能,比如我们之前说的编解码,Netty 准备很多现成的编解码,同时,Netty 还为我们准备了网络中,非常重要的一个服务-----...

tantexian
03/13
45
0
Netty精粹之TCP粘包拆包问题

粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生这个问题,因此这篇文章只...

Float_Luuu
2016/02/27
18.1K
0
netty学习过程中的一些 疑惑

1: netty 中的 缓冲区一共有四种 一般通过Unpooled.buffer(),Unpooled.directBuffer();获取 UnpooledHeapByteBuf 没有池化的推缓冲区 UnpooledDirectByteBuf 没有池化的直接缓冲区 一般通过c...

正义柔情永在
2018/07/10
122
0
【原创】Netty 内核

作者:星巴刻 一、Netty 内核组 Netty 运行时包含了多个内核。在服务端程序中,需要分别创建 parent 和 child 两种内核: 1 个 parent 内核和 16 个 child 内核( 8 核 CPU系统下的默认数)。...

星巴刻
2017/11/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

golang-字符串-地址分析

demo package mainimport "fmt"func main() {str := "map.baidu.com"fmt.Println(&str, str)str = str[0:5]fmt.Println(&str, str)str = "abc"fmt.Println(&s......

李琼涛
今天
4
0
Spring Boot WebFlux 增删改查完整实战 demo

03:WebFlux Web CRUD 实践 前言 上一篇基于功能性端点去创建一个简单服务,实现了 Hello 。这一篇用 Spring Boot WebFlux 的注解控制层技术创建一个 CRUD WebFlux 应用,让开发更方便。这里...

泥瓦匠BYSocket
今天
6
0
从0开始学FreeRTOS-(列表与列表项)-3

FreeRTOS列表&列表项的源码解读 第一次看列表与列表项的时候,感觉很像是链表,虽然我自己的链表也不太会,但是就是感觉很像。 在FreeRTOS中,列表与列表项使用得非常多,是FreeRTOS的一个数...

杰杰1号
今天
8
0
Java反射

Java 反射 反射是框架设计的灵魂(使用的前提条件:必须先得到代表的字节码的 Class,Class 类 用于表示.class 文件(字节码)) 一、反射的概述 定义:JAVA 反射机制是在运行状态中,对于任...

zzz1122334
今天
5
0
聊聊nacos的LocalConfigInfoProcessor

序 本文主要研究一下nacos的LocalConfigInfoProcessor LocalConfigInfoProcessor nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/config/impl/LocalConfigInfoProcessor.java p......

go4it
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部