netty源码分析系列——Channel
netty源码分析系列——Channel
杨武兵 发表于6个月前
netty源码分析系列——Channel
  • 发表于 6个月前
  • 阅读 52
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 十分钟定制你的第一个小程序>>>   

前言

Channel是netty中作为核心的一个概念,我们从启动器(Bootstrap)中了解到最终启动器的两个关键操作connect和bind方法都最终调用的是Channel对应的2个方法,所以主要的实现在Channel中,它可以说是netty中最重要的接口及相关实现类之一,我们接着进入Channel极其相关的类源码研究。

Channel是一个到网络套接字(Socket)或者其它组件的连接,它负责读,写,连接和绑定等I/O操作。

类结构

Channel继承的父接口

Channel的子接口和实现类

从它的实现类树我们可以看到它有非常多的实现类,篇幅限制我们不可能将每种源码的实现都一样分析一遍。我们将挑出我们最常用的2个实现类将分析一下。

NioSoketChannel。这个类我们在netty客户端的实例中使用过,也是最常用的一个客户端实现类,我们选择分析它。

NioServerSoketChannel。这个类我们在netty服务端的实例中使用过,也是最常用的一个服务端实现类,我们选择分析它。

其它的实现跟具体的传输层协议相关,比如数据报协议UDP的实现类***DatagramChannel,SCTP协议的连接连接实现等。

netty支持的实现类型汇总如下。

NioSocketChannel

该类是我们使用的一个Nio的Socket的Channel实现类,作为客户端的连接实现类,我们从它几个比较关键的方法入手去研究它的源码实现。

connect方法

最外层的方法实现位于它间接继承的抽闲类AbstractChannel类中。

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, localAddress, promise);//调用了Channel拥有的pipeline的connect方法
    }

ChannelPipeline只有一个默认的实现类DefaultChannelPipeline,进入到该类的connect方法中。

    public final ChannelFuture connect(
            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
    }

 调用了tail.connect方法来执行连接操作。tail是一个内部类DefaultChannelPipeline$TailContext的对象,而该内部类又继承自抽象类AbstractChannelHandlerContext,调用该类中的connect方法,源码如下。

    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return this.connect(remoteAddress, (SocketAddress)null, promise);//调用重载方法继续处理。
    }

    public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if(remoteAddress == null) {//1
            throw new NullPointerException("remoteAddress");
        } else if(!this.validatePromise(promise, false)) {//2
            return promise;
        } else {
            final AbstractChannelHandlerContext next = this.findContextOutbound();//3
            EventExecutor executor = next.executor();
            if(executor.inEventLoop()) {//4
                next.invokeConnect(remoteAddress, localAddress, promise);
            } else {
                safeExecute(executor, new Runnable() {//5
                    public void run() {
                        next.invokeConnect(remoteAddress, localAddress, promise);
                    }
                }, promise, (Object)null);
            }

            return promise;
        }
    }

1,2处检查输入参数的合法性。

3处查找下一个处理器。因为pipleline是一个链表数据结构,即可通过tail的pre属性获得上一个处理器。经过查找最后得到了定义的内部类DefaultChannelPipeline$HeadContext的对象。

4处检查当前线程是否处于next处理器关联的事件循环器中。如果在循环器中则直接执行,实际执行发现在循环中,接着走该分支。

5处是如果当前线程不在循环其中,则提交一个异步任务。

最后两个分支都调用了next.invokeConnect方法来执行连接操作。而DefaultChannelPipeline$HeadContext类也继承自AbstractChannelHandlerContext,实际上调用的是该抽象类中定义的invokeConnect方法。

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if(this.invokeHandler()) {//1
            try {
                ((ChannelOutboundHandler)this.handler()).connect(this, remoteAddress, localAddress, promise);//2
            } catch (Throwable var5) {
                notifyOutboundHandlerException(var5, promise);//3
            }
        } else {
            this.connect(remoteAddress, localAddress, promise);//4
        }

    }

1处判断是否执行处理器的connect方法,该方法当前情况下返回true,执行该分支。

4处若不是执行处理器,则执行当前类中的connect方法。

2处调用handler的connect方法来执行真正的连接方法。当前的handler()方法返回的结果就是DefaultChannelPipeline$HeadContext对象本身,因此会就如该类的connect方法。

3处是若连接异常则发出异常通知。

DefaultChannelPipeline$HeadContext类的connect方法才执行了连接的逻辑,进入到该方法一探究竟。

        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
            this.unsafe.connect(remoteAddress, localAddress, promise);//1
        }

1处调用了this.unsafe的connect方法,而该对象来自何处呢,从构造函数我们知道它通过代码this.unsafe = pipeline.channel().unsafe(); 获得的对象,也就是来自于pipeline绑定的连接通道中的unsafe方法对象。我们知道当前案例中channel是NioSocketChannel,是调用该类的unsafe方法获取到的Unsafe对象。最后得到的是内部类NioSocketChannel$NioSocketChannelUnsafe的对象的方法。

进入到该类的connect方法源码如下。

  public final void connect(final SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
            if(promise.setUncancellable() && this.ensureOpen(promise)) {//1
                try {
                    if(AbstractNioChannel.this.connectPromise != null) {//2
                        throw new ConnectionPendingException();
                    }

                    boolean t = AbstractNioChannel.this.isActive();
                    if(AbstractNioChannel.this.doConnect(remoteAddress, localAddress)) {//3
                        this.fulfillConnectPromise(promise, t);
                    } else {//4
                        AbstractNioChannel.this.connectPromise = promise;
                        AbstractNioChannel.this.requestedRemoteAddress = remoteAddress;
                        int connectTimeoutMillis = AbstractNioChannel.this.config().getConnectTimeoutMillis();
                        if(connectTimeoutMillis > 0) {
                            AbstractNioChannel.this.connectTimeoutFuture = AbstractNioChannel.this.eventLoop().schedule(new Runnable() {//5
                                public void run() {
                                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                    ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                    if(connectPromise != null && connectPromise.tryFailure(cause)) {
                                        AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
                                    }

                                }
                            }, (long)connectTimeoutMillis, TimeUnit.MILLISECONDS);
                        }

                        promise.addListener(new ChannelFutureListener() {//6
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if(future.isCancelled()) {
                                    if(AbstractNioChannel.this.connectTimeoutFuture != null) {
                                        AbstractNioChannel.this.connectTimeoutFuture.cancel(false);
                                    }

                                    AbstractNioChannel.this.connectPromise = null;
                                    AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
                                }

                            }
                        });
                    }
                } catch (Throwable var6) {
                    promise.tryFailure(this.annotateConnectException(var6, remoteAddress));//7
                    this.closeIfClosed();
                }

            }
        }

1处设置并检查promise合法性。

2处检查当前连接是否正在连接中。

3处调用类NioSocketChannel的doConnect方法执行连接操作。如果返回true表示连接完成,则直接填充promise对象的结果。

4处是当doConnect方法返回false的情况下的处理。

5处则是添加一个定时调度任务。在设置的连接超时时间经过之后将promise状态设置为失败,失败原因是超时。

6处是给promise对象添加一个监听器。在操作完成的情况下出发该监听器的操作,如果连接被取消,则取消超时futrue和关闭连接。

7处是异常通知处理。

核心逻辑上是NioSocketChannel的doConnect方法。

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if(localAddress != null) {
            this.doBind0(localAddress);//1
        }
        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(this.javaChannel(), remoteAddress);//2
            if(!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);//3
            }
            success = true;
            return connected;
        } finally {
            if(!success) {
                this.doClose();
            }
        }
    }

1处若传入localAddress,则先绑定该地址。

2处调用SocketUtils的connect方法进行连接,传入javaChannel,该对象是jdk中的SocketChannel,在nio中它通过configureBlocking(false);设置为非阻塞模式的Socke连接,因此在调用连接方法的时候会立即返回。

3处如果未建立连接,则设置连接感兴趣的操作是“OP_CONNECT”的操作。

SocketUtils的connect方法是执行了真正的连接方法,进入该方法看一下源码。

    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
            throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {//1
                @Override
                public Boolean run() throws IOException {
                    return socketChannel.connect(remoteAddress);//2
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

1处执行特权的代码逻辑,将连接的代码放在匿名内部类中。详细说明参考http://blog.csdn.net/heipacker/article/details/9295031

2处真正调用了jdk的SocketChannel的connect方法,该对象在之前被设置了blocking=false值,则此时调用connect方法的时候不会阻塞,会立即返回。

register

register方法是将Channel注册到一个EventLoop对象上。

      /**
         * Register the {@link Channel} of the {@link ChannelPromise} and notify
         * the {@link ChannelFuture} once the registration was complete.
         */
        void register(EventLoop eventLoop, ChannelPromise promise);

eventloop是要注册的事件循环器,而promise是注册结果的Future对象。

AbstractChannel的resigter方法

   public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {//1
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {//2
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {//3
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {//4
                register0(promise);
            } else {//5
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {//6
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

1 处检查eventloop对象是否为null,2处检查是否已经注册过,3处检查是否兼容,3个地方都是对参数或状态进行检查;

4若当前处于eventloop的线程中则直接调用内部方法register0方法注册;

5处则不在同一个线程内,则启动一个新的线程执行regsiter0方法。

6处是异常处理,若执行出现异常,则标记promise状态为失败,并记录异常信息。

AbstractChannel的register0方法

       private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

AbstractNioChannel的doRegister方法

  protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

NioServerSocketChannel

NioServerSocketChannel是netty的nio服务端的连接通道,在ServerBootstrap中我们知道最终启动服务器最后调用的核心代码都是调用的该类中的方法,该类是我们最常使用的一个实现类,我们只分析该类的源码实现。

bind

bind方法是绑定本地的一个地址,等待客户端的连接,该方法是最主要的职责。

        /**
         * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
         * it once its done.
         */
        void bind(SocketAddress localAddress, ChannelPromise promise);

绑定一个本地地址,该方法又是一个异步非阻塞方法,绑定的结果会通过promise通知调用方。

AbstractChannel的bind方法

AbstractChannel是抽象类,它提供了bind方法的实现。

    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);//1
    }

1处可以看出来,它调用了DefaultChannelPipeline类的bind方法。

    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);//1
    }

1处调用了tail的bind方法,此处的tail对象是ChannelHandlerContext(DefaultChannelPipeline$TailContext类型的。

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {//1
            throw new NullPointerException("localAddress");
        }
        if (!validatePromise(promise, false)) {//2
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();//3
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {//4
            next.invokeBind(localAddress, promise);
        } else {//5
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

1,2处检查输入参数的合法性。

3处获得下一个handler,这里得到的是对象是DefaultChannelPipeline$HeadContext类型的。

4处判断当前线程是否处于eventloop中,若是则直接执行,当前实例中流程执行了该分支。

5处若不是在eventLoop中则启动一个异步任务执行。

进入到DefaultChannelPipeline$HeadContext类的invokeBind方法。

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {//1
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);//2
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);//3
    }
}

1处判断是否可执行处理器,如果可以执行处理器,则执行处理器的bind方法;

2处调用handler()方法获得处理器,然后再调用Handler的bind方法。当前情况执行该分支。

3处不执行处理器的方法,则直接执行bind方法。

执行第2步后则进入到内部类HeadContext的bind方法

        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);//1
        }

1处代码可以知道它调用了unsafe的bind方法,此时unsafe对象是AbstractNioMessageChannel的内部类NioMessageUnsafe的类型。

由于它继承了

      public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();//1

            if (!promise.setUncancellable() || !ensureOpen(promise)) {//2
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }//3

            boolean wasActive = isActive();
            try {
                doBind(localAddress);//4
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {//5
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);//6
        }

1处检查是否处于evetLoop中。

2处检查promise合法性。

3处是修复一个bug,当在*unix操作系统上,非root用户接收广播消息包,socket没有绑定到通配符地址上,则警告用户无接收。

4处则调用doBind方法继续执行绑定其它逻辑。

5处则是当连接通道是首次激活则会异步触发pipline的fireChannelActive方法。

6处设置promise的状态为成功。

接下来就进入到了NioServerSocketChannel类的doBind方法。

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {//1
            javaChannel().bind(localAddress, config.getBacklog());//2
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());//3
        }
    }

1处判断当前jdk的版本是否大雨1.7.

2若jdk1.7以上版本则直接调用bind方法,我们用的1.7的jdk,走该分支。backlog参数在这里是128,表示最大允许的等待连接数。

3若jdk1.6及以下版本则调用socket()然后再调用bind方法。

最后通过调用jdk自带的ServerSocketChannel类的bind方法来实现真正的绑定,我们到这里就停止深究了。

read

连接最重要的特性就是读写网络字节流,先看看read方法背后是怎么实现的。

上一节我们从源码分析知道,当NioServerSocketChannel的连接建立完毕后,会触发pipeline.fireChannelActive()的方法,我们看看该方法的源码。

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {//1
                channel.read();//2
            }
        }

1处会判断当前连接的配置是否自动读,当前情况下配置是自动读信息。

2处会最终调用DefaultChannelPipline的read方法来实现读取操作。

我们再进入到read方法。

    public final ChannelPipeline read() {
        tail.read();
        return this;
    }

可以看出来,它又调用了tail的read方法,而tail对象是DefaultChannelPipeline$TailContext内部类。

进入它的read方法。

     public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();//1
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {//2
            next.invokeRead();//3
        } else {//4
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }

1处查找处理器链的下一个处理器,此处得到的是DefaultChannelPipeline$HeadContext类的对象。

2处检查当前是否处于EventLoop线程中,本实例中是处于eventLoop中。

3处执行DefaultChannelPipeline$HeadContext类的invokeRead方法。

4处当线程未处于eventLoop中,则启动新的线程执行invokeRead方法。

再进入到DefaultChannelPipeline$HeadContext类的invokeRead方法中看源码。

    private void invokeRead() {
        if (invokeHandler()) {//1
            try {
                ((ChannelOutboundHandler) handler()).read(this);//2
            } catch (Throwable t) {
                notifyHandlerException(t);//3
            }
        } else {
            read();
        }
    }

1处判断标志是否执行处理器,本例中会执行该分支。

2处执行handler的read方法。通过handler()方法得到是当前对象本身,即也是DefaultChannelPipeline$HeadContext类。

3处当出现异常则执行通知方法。

接着执行DefaultChannelPipeline$HeadContext类的read方法,它的源码如下。

        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

它又会调用unsafe对象的beginRead方法,此处unsafe对象是一个AbstractNioMessageChannel的内部类NioMessageUnsafe。

我们先进入到它的父类AbstractNioChannel$AbstractUnsafe中看beginRead方法源码。

  public final void beginRead() {
            assertEventLoop();//1

            if (!isActive()) {//2
                return;
            }

            try {
                doBeginRead();//3
            } catch (final Exception e) {//4
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

1处检查eventLoop合法行,2处检查连是否激活,都是检查相关部件的状态合法性。

3处调用子类的doBeginRead方法,继续执行其它的逻辑。此处将进入到AbstractNioChannel$AbstractUnsafe类的该方法中。

4处是异常处理,当发生异常,则异步执行pipline.fireExceptionCaught方法。

再进入到AbstractNioChannel$AbstractUnsafe的doBeginRead方法中。

    protected void doBeginRead() throws Exception {
        if (inputShutdown) {//1
            return;
        }
        super.doBeginRead();//2
    }

1处检查输入流是否关闭,若关闭则直接返回。

2处又调用了它父类的该方法。

进入到父类的doBeginRead方法,这里是指AbstractNioChannel$AbstractNioUnsafe,我们进入该方法的源码。

    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {//1
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);//2
        }
    }

1处检查当前jdk nio Channel的selectionKey的合法性,若不合法则跳出。

2处则将selectionKey关注的操作类型的值从0变更为16。该值的即为SelectionKey中定义的常量OPT_ACCEPT= 1<<4操作,即为监听接收新的连接事件。

通过该步操作我们可以得知,它最终设置了监听接收连接操作的事件。

 

共有 人打赏支持
杨武兵
粉丝 192
博文 56
码字总数 123254
作品 1
×
杨武兵
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: