文档章节

Netty 源码分析(三):服务器端的初始化和注册过程

编走编想
 编走编想
发布于 2015/10/20 23:39
字数 2708
阅读 709
收藏 24

1. 简介

接下来我们会通过使用 Netty 去实现 NIO TCP 服务器的这个场景来解析 Netty 的源代码,深入了解 Netty 的设计。

使用 Netty 来实现一个 TCP 服务器,我们大致要做以下事情:

  1. 创建 ServerSocketChannel、Channel、ChannelHandler 等一系列对象。这里的 Channel 将包含 ServerSocketChannel 和一系列 ChannelHandler。
  2. 将 ServerSocketChannel 注册到 Selector 多路复用器上
  3. 启动 EventLoop,并将 Channel 注册到其中
  4. 为 ServerSocketChannel 绑定端口
  5. 接受客户端连接,并将 SocketChannel 注册到 Selector 和 EventLoop 中
  6. 处理读写事件 ...

2. 源码解析

我们先来了解 Server Channel 的初始化和注册的过程。初始化是来构建 Netty 本身的各种组件,应用用户的设置参数。注册的主要工作最终是将 SelectableChannel 注册到多路复用器 Selector。这一过程在所有基于 Java NIO 的项目里都是类似的。

服务端的整个构建过程是从 ServerBootstrap 开始的:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(channelInitializer)
    .bind(portNum).sync().channel().closeFuture().sync();

接下来我们从 AbstractBootstrapbind(int port) 方法开始了解 Netty 的源码。

2.1 类:AbstractBootstrap,方法:ChannelFuture doBind(final SocketAddress localAddress)

bind(int port) 的实际工作绝大部分是在 AbstractBootstrapChannelFuture doBind(final SocketAddress localAddress) 实现的,我们来看其源码:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
 
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

doBind(SocketAddress localAddress) 方法主要会做三件事:

  1. ServerChannel 初始化
  2. 注册 ServerChannel 到 Selector
  3. 将 ServerChannel bind 到本地端口上

最后一步 bind 需要在 register 完成之后再执行,但是因为这些动作都可能发生在不同的线程上,所以 bind 的动作是通过回调的方式实现的,具体细节后面再介绍。本篇将先介绍前两个操作。

因为 AbstractBootstrap 类的 initAndRegister() 方法是接下来第一个被调用的方法,所以我们接下来看它的源码。

2.2 类:AbstractBootstrap,方法:ChannelFuture initAndRegister()

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
 
    ChannelFuture regFuture = group().register(channel); // 见 2.5
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
 
    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
 
    return regFuture;
}

这个方法主要做了下面几件事:

  • Channel channel = channelFactory().newChannel(); 此处通过 channelFactory() 方法得到的 ChannelFactory 实现类是 ReflectiveChannelFactory。(这个类是默认实现,AbstractBootstrap 还提供了方法用来设置其它 ChannelFactory 的实现)这个类将通过 channel(NioServerSocketChannel.class) 提供的类反射创建出 NioServerSocketChannel。而在 NioServerSocketChannel 的构造函数里,一个 java.nio.channels.ServerSocketChannel 将被构建出来。

  • 接下来,在 NioServerSocketChannel 的父类 AbstractNioChannel 的构造函数中,ServerSocketChannel 被设置成非阻塞模式 ch.configureBlocking(false);

  • AbstractNioChannel 父类 AbstractChannel 的构造函数里,有两个重要的对象被构造出来

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

在这里 unsafepipeline 被构建出来。构造出 unsafenewUnsafe() 方法是在子类中实现,在本例中 NioMessageUnsafe(还有一个类似的 NioByteUnsafe,其为 NioSocketChannel 提供了具体的 IO 实现),它包含了很多具体的方法实现。而 DefaultChannelPipeline 则是另一个重要的类,接下来我们来看看它的构造函数。

2.3 类 DefaultChannelPipeline 的构造函数

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
       throw new NullPointerException("channel");
    }
    this.channel = channel;
 
    tail = new TailContext(this);
    head = new HeadContext(this);
 
    head.next = tail;
    tail.prev = head;
}

DefaultChannelPipeline 构造函数中做了一件很重要的事,就是构造了 ChannelHandler 链。TailContextHeadContext 都继承了 AbstractChannelHandlerContext。另外,TailContext 实现了 ChannelInboundHandlerHeadContext 实现了 ChannelOutboundHandler(Netty 5 在此处有变化,其不再区分 ChannelInboundHandlerChannelOutboundHandler)。

上面这些就是 final Channel channel = channelFactory().newChannel(); 所做的事情。接下来我们来了解 init(Channel channel) 方法所做的事情。在这里,void init(Channel channel) 方法由 AbstractBootstrap 的子类 ServerBootstrap 实现

2.4 类:ServerBootstrap,方法:void init(Channel channel)

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
 
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
 
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }
 
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
 
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

这个方法主要做了两件事:

  1. NioServerSocketChannel 设置 options 和 attrs
  2. 为 pipeline 添加一个 Inbound 处理器 ChannelInitializer。这个 ChannelInitializer 会在 ChannelRegistered 事件发生时将 ServerBootstrapAcceptor 添加到 pipeline 上。在后面 ServerBootstrapAcceptor 将被用来接收客户端的连接,我们会在后续文章中介绍。DefaultChannelPipelineaddLast 方法会将新的 ChannelHandler 添加到 tail 之前,其它所有 ChannelHandler 之后。

在做完初始化工作之后,就要开始注册的工作了。接下来来看 group().register(channel) 的实现。其中 group() 方法将会返回我们在 b.group(bossGroup, workerGroup) 中设定的 bossGroup 这里不做过多介绍了。接下来看 register(channel) 方法。

2.5 类:AbstractChannel.AbstractUnsafe,方法:void register(EventLoop eventLoop, ChannelPromise promise)

因为我们实现的是一个 NIO server,所以此处 EventLoop 使用的实现类是 NioEventLoopNioEventLoopregister(Channel) 方法是继承自 SingleThreadEventLoop。而 SingleThreadEventLoop 则通过 channel.unsafe().register(this, promise) 方法将注册工作代理给了 Channel.Unsafe 来实现。此处 Unsafe 的实现类是 AbstractChannel.AbstractUnsafe(多么一致的命名)。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
 
    AbstractChannel.this.eventLoop = eventLoop;
 
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
 
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

先解释一下 eventLoop.inEventLoop(),这个判断在很多地方都能看见。这个方法是用来判断当前的线程是否是 EventLoop 的任务执行线程。如果是,那就不用在添加任务,直接执行就可以了,否则需要将任务添加到 EventLoop 中。在本例中,很明显,执行过程将走到 else 分支中。

注册工作主要是在 doRegister() 方法中实现的,这个方法是定义在 AbstractChannel 中的一个抽象方法。在本例中,这个方法由 AbstractNioChannel 实现的。

2.6 类:AbstractNioChannel,方法:doRegister()

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

在这个方法中,selectionKey = javaChannel().register(eventLoop().selector, 0, this); 通过调用 JDK 方法,将 SelectableChannel 注册到 Selector 上。注意一个细节,因为同一个 Channel 和 Selector 可以对应一个 SelectionKey,所以如果另外一个相应的 SelectionKey 的 cancel 方法被执行之后,会导致 SelectableChannelregister 方法抛出 CancelledKeyException。所以这里通过 selectNow() 方法清除取消状态之后,重新 register。循环的原因就像是注释所描述的一样,Netty 团队也不清楚,这难道是 JDK 的 bug 吗?

上面这几个过程就实现了 Server Channel 的初始化和注册工作。

3. 总结

3.1 设计模式

从这部分代码,我们可以看到很多设计模式的使用

Builder 模式

在使用 ServerBootstrap 构建服务端的时候,Netty 应用了 Builder 模式。虽不是典型应用,但也起到了是代码简洁易懂的目的。

工厂方法模式

Channel channel = channelFactory().newChannel(); 使用了工厂方法模式。ServerBootstrap 提供了方法设置工厂类,同时也提供了默认实现。通过工厂方法模式创建 Channel,实现了良好的可扩展性。

模板方法模式

AbstractBootstrapvoid init(Channel channel) 就是一个模板方法。初始化和注册工作的主要实现方法是 AbstractBootstrapinitAndRegister,这个方法调用模板方法 init(Channel)。而模板方法 init(Channel) 则交由子类做具体的实现。

同样,AbstractChanneldoRegister 方法也是一个模板方法模式的例子。

3.2 设计思想

Netty 用上面复杂的代码实现了并不复杂的功能。其背后反映处的思想就是作为一个通用的高性能网络 IO 框架,Netty 必须设计出一个高性能、高扩展性的基础架构,然后再在这个架构之上实现各种功能。Netty 的执行核心是 EventLoopGroup 及其实现类。绝大部分 IO 操作和非 IO 操作都是交由 EventLoopGroup 来执行。这是 Netty 能被用来实现高并发服务的原因之一。所以本文所涉及的操作,所以并不复杂,但是其中的一些操作,例如注册工作,也是需要交由 EventLoopGroup 异步执行。这虽然因为异步的方式,提高了系统的执行效率,但事也未任务直接的协调制造了困难,这一点在后续的介绍中会看的很清楚。

如果说 EventLoopGroup 是执行调度的核心,那 Channel 就是实现具体操作的实现核心。因为网络编程十分复杂,有各种复杂的协议,也有复杂的底层操作系统实现,所以 Channel 相应的实现类也是种类繁多。这其实并没有增加复杂度,而是将各种复杂功能各归其主,将实现细节梳理清晰。

4. 接下来

接下来的文章将会解析服务端端口绑定的实现。

© 著作权归作者所有

编走编想
粉丝 152
博文 128
码字总数 111395
作品 0
海淀
程序员
私信 提问
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)

目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO 的前生今世 之三 NIO Buffer 详解 Java NIO 的前生...

永顺
2017/11/29
0
0
Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)

目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO...

永顺
2017/11/29
0
0
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)

目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO 的前生今世 之三 NIO Buffer 详解 Java NIO 的前生...

永顺
2017/11/29
0
0
【死磕Netty】—–服务端启动过程分析

原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! 上篇博客(【死磕Netty】----Netty的核心组件及其设计),了解了 Netty 的核心组件及其设计,但是这些都是零散的,不成...

chenssy
2017/12/04
0
0
Netty源码—一、server启动(1)

Netty作为一个Java生态中的网络组件有着举足轻重的位置,各种开源中间件都使用Netty进行网络通信,比如Dubbo、RocketMQ。可以说Netty是对Java NIO的封装,比如ByteBuf、channel等的封装让网络...

lacker
2018/07/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
今天
5
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
今天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
今天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
今天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部