Netty 源码分析(三):服务器端的初始化和注册过程

原创
2015/10/20 23:39
阅读数 975

1. 简介

接下来我们会通过使用 Netty 去实现 NIO TCP 服务器的这个场景来解析 Netty 的源代码,深入了解 Netty 的设计。

使用 Netty 来实现一个 TCP 服务器,我们大致要做以下事情:

  1. 创建 ServerSocketChannel、Channel、ChannelHandler 等一系列对象。这里的 Channel 将包含 ServerSocketChannel 和一系列 ChannelHandler。
  2. 将 ServerSocketChannel 注册到 Selector 多路复用器上
  3. 启动 EventLoop,并将 Channel 注册到其中
  4. 为 ServerSocketChannel 绑定端口
  5. 接受客户端连接,并将 SocketChannel 注册到 Selector 和 EventLoop 中
  6. 处理读写事件 ...

2. 源码解析

我们先来了解 Server Channel 的初始化和注册的过程。初始化是来构建 Netty 本身的各种组件,应用用户的设置参数。注册的主要工作最终是将 SelectableChannel 注册到多路复用器 Selector。这一过程在所有基于 Java NIO 的项目里都是类似的。

服务端的整个构建过程是从 ServerBootstrap 开始的:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(channelInitializer)
    .bind(portNum).sync().channel().closeFuture().sync();

接下来我们从 AbstractBootstrapbind(int port) 方法开始了解 Netty 的源码。

2.1 类:AbstractBootstrap,方法:ChannelFuture doBind(final SocketAddress localAddress)

bind(int port) 的实际工作绝大部分是在 AbstractBootstrapChannelFuture doBind(final SocketAddress localAddress) 实现的,我们来看其源码:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
 
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

doBind(SocketAddress localAddress) 方法主要会做三件事:

  1. ServerChannel 初始化
  2. 注册 ServerChannel 到 Selector
  3. 将 ServerChannel bind 到本地端口上

最后一步 bind 需要在 register 完成之后再执行,但是因为这些动作都可能发生在不同的线程上,所以 bind 的动作是通过回调的方式实现的,具体细节后面再介绍。本篇将先介绍前两个操作。

因为 AbstractBootstrap 类的 initAndRegister() 方法是接下来第一个被调用的方法,所以我们接下来看它的源码。

2.2 类:AbstractBootstrap,方法:ChannelFuture initAndRegister()

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
 
    ChannelFuture regFuture = group().register(channel); // 见 2.5
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
 
    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
 
    return regFuture;
}

这个方法主要做了下面几件事:

  • Channel channel = channelFactory().newChannel(); 此处通过 channelFactory() 方法得到的 ChannelFactory 实现类是 ReflectiveChannelFactory。(这个类是默认实现,AbstractBootstrap 还提供了方法用来设置其它 ChannelFactory 的实现)这个类将通过 channel(NioServerSocketChannel.class) 提供的类反射创建出 NioServerSocketChannel。而在 NioServerSocketChannel 的构造函数里,一个 java.nio.channels.ServerSocketChannel 将被构建出来。

  • 接下来,在 NioServerSocketChannel 的父类 AbstractNioChannel 的构造函数中,ServerSocketChannel 被设置成非阻塞模式 ch.configureBlocking(false);

  • AbstractNioChannel 父类 AbstractChannel 的构造函数里,有两个重要的对象被构造出来

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

在这里 unsafepipeline 被构建出来。构造出 unsafenewUnsafe() 方法是在子类中实现,在本例中 NioMessageUnsafe(还有一个类似的 NioByteUnsafe,其为 NioSocketChannel 提供了具体的 IO 实现),它包含了很多具体的方法实现。而 DefaultChannelPipeline 则是另一个重要的类,接下来我们来看看它的构造函数。

2.3 类 DefaultChannelPipeline 的构造函数

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
       throw new NullPointerException("channel");
    }
    this.channel = channel;
 
    tail = new TailContext(this);
    head = new HeadContext(this);
 
    head.next = tail;
    tail.prev = head;
}

DefaultChannelPipeline 构造函数中做了一件很重要的事,就是构造了 ChannelHandler 链。TailContextHeadContext 都继承了 AbstractChannelHandlerContext。另外,TailContext 实现了 ChannelInboundHandlerHeadContext 实现了 ChannelOutboundHandler(Netty 5 在此处有变化,其不再区分 ChannelInboundHandlerChannelOutboundHandler)。

上面这些就是 final Channel channel = channelFactory().newChannel(); 所做的事情。接下来我们来了解 init(Channel channel) 方法所做的事情。在这里,void init(Channel channel) 方法由 AbstractBootstrap 的子类 ServerBootstrap 实现

2.4 类:ServerBootstrap,方法:void init(Channel channel)

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
 
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
 
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }
 
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
 
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

这个方法主要做了两件事:

  1. NioServerSocketChannel 设置 options 和 attrs
  2. 为 pipeline 添加一个 Inbound 处理器 ChannelInitializer。这个 ChannelInitializer 会在 ChannelRegistered 事件发生时将 ServerBootstrapAcceptor 添加到 pipeline 上。在后面 ServerBootstrapAcceptor 将被用来接收客户端的连接,我们会在后续文章中介绍。DefaultChannelPipelineaddLast 方法会将新的 ChannelHandler 添加到 tail 之前,其它所有 ChannelHandler 之后。

在做完初始化工作之后,就要开始注册的工作了。接下来来看 group().register(channel) 的实现。其中 group() 方法将会返回我们在 b.group(bossGroup, workerGroup) 中设定的 bossGroup 这里不做过多介绍了。接下来看 register(channel) 方法。

2.5 类:AbstractChannel.AbstractUnsafe,方法:void register(EventLoop eventLoop, ChannelPromise promise)

因为我们实现的是一个 NIO server,所以此处 EventLoop 使用的实现类是 NioEventLoopNioEventLoopregister(Channel) 方法是继承自 SingleThreadEventLoop。而 SingleThreadEventLoop 则通过 channel.unsafe().register(this, promise) 方法将注册工作代理给了 Channel.Unsafe 来实现。此处 Unsafe 的实现类是 AbstractChannel.AbstractUnsafe(多么一致的命名)。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
 
    AbstractChannel.this.eventLoop = eventLoop;
 
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
 
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

先解释一下 eventLoop.inEventLoop(),这个判断在很多地方都能看见。这个方法是用来判断当前的线程是否是 EventLoop 的任务执行线程。如果是,那就不用在添加任务,直接执行就可以了,否则需要将任务添加到 EventLoop 中。在本例中,很明显,执行过程将走到 else 分支中。

注册工作主要是在 doRegister() 方法中实现的,这个方法是定义在 AbstractChannel 中的一个抽象方法。在本例中,这个方法由 AbstractNioChannel 实现的。

2.6 类:AbstractNioChannel,方法:doRegister()

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

在这个方法中,selectionKey = javaChannel().register(eventLoop().selector, 0, this); 通过调用 JDK 方法,将 SelectableChannel 注册到 Selector 上。注意一个细节,因为同一个 Channel 和 Selector 可以对应一个 SelectionKey,所以如果另外一个相应的 SelectionKey 的 cancel 方法被执行之后,会导致 SelectableChannelregister 方法抛出 CancelledKeyException。所以这里通过 selectNow() 方法清除取消状态之后,重新 register。循环的原因就像是注释所描述的一样,Netty 团队也不清楚,这难道是 JDK 的 bug 吗?

上面这几个过程就实现了 Server Channel 的初始化和注册工作。

3. 总结

3.1 设计模式

从这部分代码,我们可以看到很多设计模式的使用

Builder 模式

在使用 ServerBootstrap 构建服务端的时候,Netty 应用了 Builder 模式。虽不是典型应用,但也起到了是代码简洁易懂的目的。

工厂方法模式

Channel channel = channelFactory().newChannel(); 使用了工厂方法模式。ServerBootstrap 提供了方法设置工厂类,同时也提供了默认实现。通过工厂方法模式创建 Channel,实现了良好的可扩展性。

模板方法模式

AbstractBootstrapvoid init(Channel channel) 就是一个模板方法。初始化和注册工作的主要实现方法是 AbstractBootstrapinitAndRegister,这个方法调用模板方法 init(Channel)。而模板方法 init(Channel) 则交由子类做具体的实现。

同样,AbstractChanneldoRegister 方法也是一个模板方法模式的例子。

3.2 设计思想

Netty 用上面复杂的代码实现了并不复杂的功能。其背后反映处的思想就是作为一个通用的高性能网络 IO 框架,Netty 必须设计出一个高性能、高扩展性的基础架构,然后再在这个架构之上实现各种功能。Netty 的执行核心是 EventLoopGroup 及其实现类。绝大部分 IO 操作和非 IO 操作都是交由 EventLoopGroup 来执行。这是 Netty 能被用来实现高并发服务的原因之一。所以本文所涉及的操作,所以并不复杂,但是其中的一些操作,例如注册工作,也是需要交由 EventLoopGroup 异步执行。这虽然因为异步的方式,提高了系统的执行效率,但事也未任务直接的协调制造了困难,这一点在后续的介绍中会看的很清楚。

如果说 EventLoopGroup 是执行调度的核心,那 Channel 就是实现具体操作的实现核心。因为网络编程十分复杂,有各种复杂的协议,也有复杂的底层操作系统实现,所以 Channel 相应的实现类也是种类繁多。这其实并没有增加复杂度,而是将各种复杂功能各归其主,将实现细节梳理清晰。

4. 接下来

接下来的文章将会解析服务端端口绑定的实现。

展开阅读全文
打赏
1
24 收藏
分享
加载中
更多评论
打赏
0 评论
24 收藏
1
分享
在线直播报名
返回顶部
顶部