Netty源码分析之服务端启动流程

原创
2019/02/16 13:47
阅读数 1.7K

上节对Netty的做了简单介绍,这节分析下Netty启动流程,后面的源码分析都以Netty4.0.32版本为例,以下面启动代码为例子

public class TimeServer {

    public void bind(int port) throws Exception {
	// 1.配置服务端的NIO线程组
	EventLoopGroup bossGroup = new NioEventLoopGroup();
	EventLoopGroup workerGroup = new NioEventLoopGroup();
	try {
	    ServerBootstrap b = new ServerBootstrap();
		//2.配置参数
	    b.group(bossGroup, workerGroup)
		    .channel(NioServerSocketChannel.class)
		    .option(ChannelOption.SO_BACKLOG, 1024)
		    .handler(new LoggingServerHandler())
		    .childHandler(new ChildChannelHandler());
	    // 3.绑定端口,同步等待成功
	    ChannelFuture f = b.bind(port).sync();

	    //4. 等待服务端关闭
	    f.channel().closeFuture().sync();
	} finally {
	    //5.优雅退出,释放线程池资源
	    bossGroup.shutdownGracefully();
	    workerGroup.shutdownGracefully();
	}
    }
}

组件介绍

EventLoopGroup

EventLoopGroup是Netty的线程池,为Acceptor Selector和IO Selector事件提供线程支持,EventLoopGroup在初始化的时候会创建相应配置数量的EventLoop来提供单个线程,EventLoop功能比较复杂,既是Select的轮询事件线程也是其他IO事件处理的线程。使用时调用EventLoopGroup#next()方法来获取EventLoop。Netty的线程比较复杂后面章节会详细说明。

NioServerSocketChannel

NioServerSocketChannel提供了JDK的Channel和selector的绑定功能,再绑定端口前会调用这个方法绑定操作。同时NioServerSocketChannel有个Unsafe内部类在初始化NioServerSocketChannel时会创建Unsafe作为他的成员变量。Unsafe提供了JDK层的io操作包括读、写、绑定端口等根据继承类不同的不同操作。

ChannelHandler

ChannelHandler是Netty的核心,它的继承包含了2个重要的处理器ChannelOutboundHandler和ChannelInboundHandler。ChannelOutboundHandler是写出IO事件的处理器。ChannelInboundHandler是写入IO事件的处理器同时也包含了多个触发器,包括连接断开触发器,异常触发器,注册完成触发器,取消注册触发器等,由于ChannelHandler比较复杂后面章节会详细说明。

ChannelFuture

ChannelFuture是Netty的异步事件等待器,Netty利用其来实现所有IO事件的异步化,ChannelFuture可以注册多个事件完成监听器,异步事件会在完成后回调监听器。

ServerBootstrap

ServerBootstrap是Netty配置及启动的入口,提供线程池、连接参数、线程池以及处理器的配置,同时提供了绑定端口的方法。

启动时序图

我们以b.bind(port)这行代码作为开始流程,时序图如下:

ServerBootstrap:

  • 1.调用initAndRegister()创建并初始化channel
  • 1.1.调用init(channel)设置配置并注册ChannelHandler
  • 1.2.调用EventLoopGroup里的register(channel)将创建selector并注册到此channel上
  • 2.调用doBind0()绑定端口并启动监听
  • 2.1.调用Channel的bind(localAddress, promise)方法绑定端口

EventLoopGroup:

  • 1.2.1.调用next()获取下个EventLoop
  • 1.2.2.调用register(channel)委托给EventLoop去注册

EventLoop:

  • 1.2.2.1.调用unsafe()获取channel中的unsafe对象
  • 1.2.2.2.调用register(this, promise)委托unsafe注册

Channel:

  • 1.2.2.2.1.调用doRegister()将创建selector并注册到此channel上
  • 1.2.2.2.2.调用pipeline的fireChannelRegistered()触发注册成功事件
  • 1.2.2.2.3.调用pipeline的fireChannelActive()触发激活成功事件
  • 1.2.2.2.3.1.1.1.修改interestOps添加读操作位
  • 2.1.1.委托ChannelPipeline调用bind(localAddress, promise)
  • 2.1.1.1.1.1.调用doBind(localAddress)绑定端口
  • 2.1.1.1.1.2.如果还未激活调用pipeline的fireChannelActive()触发激活成功事件
  • 2.1.1.1.1.2.1.1.1修改interestOps添加读操作位

ChannelPipeline:

  • 1.2.2.2.2.1.调用ChannelHandler链触发注册成功事件
  • 1.2.2.2.3.1.调用ChannelHandler链触发激活成功事件,如果配置自动读则激活读事件
  • 2.1.1.1.调用ChannelHandler链触发绑定事件
  • 2.1.1.1.1.2.1.调用ChannelHandler链触发激活成功事件,如果配置自动读则激活读事件

HeadContext:

  • 1.2.2.2.3.1.1.委托unsafe调用beginRead()开始读
  • 2.1.1.1.1.委托Channel里的unsafe调用bind(localAddress, promise)
  • 2.1.1.1.1.2.1.1.委托unsafe调用beginRead()开始读

源码分析

ServerBootstrap类的线程池、连接参数、线程池以及处理器的配置其实就是成员变量的复制这里就是不多讲,我们直接从b.bind()开始看。

b.bind()的方法直接将端口包装成类SocketAddress交给doBind()处理。doBind()源码如下

    private ChannelFuture doBind(final SocketAddress localAddress) {
    	//初始化Channel并注册Selector
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        //绑定监听端口并开始监听
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
            return promise;
        }
    }

这个方法主要做了2件事

  • 调用initAndRegister()初始化Channel并注册Selector
  • 调用doBind0()绑定监听端口并开始监听

注册Selector是默认是异步进行的这里Netty做了一个处理,如果注册Selector已完成则同步调用doBind0()否则注册监听器等待Selector注册完成后调用doBind0(),这样保证整个doBind()方法的异步性。

接下来我们看下initAndRegister()的实现,源码如下:

    final ChannelFuture initAndRegister() {
    	//初始化channel
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //注册Selector
        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

initAndRegister()方法也是做了2件事

  • 调用init()初始化Channel
  • 调用group().register()注册Selector

channelFactory().newChannel()是创建你配置的Channel这里源码配置了NioServerSocketChannel,而init()的具体实现则是在NioServerSocketChannel中,init()源码如下:

   void init(Channel channel) throws Exception {
		...

        ChannelPipeline p = channel.pipeline();

        ...
        
        //将从线程组,从处理器,从配置,从属性封装成ServerBootstrapAcceptor处理器,共后续IOSelector使用。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                pipeline.addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }

在上节中我们讲了Neety服务端线程模式可以是主从模型,所以它的配置也分主从配置,init()主要做的将主处理器(ChannelHandler)添加到主ChannelPipeline里,将 从线程组,从处理器(ChannelHandler),从配置,从属性封装成ServerBootstrapAcceptor处理器添加到主ChannelPipeline里,供后续触发客户端连接事件使用。

group().register()先是取了主线程组EventLoopGroup委托EventLoopGroup注册Selector,主线程组中也是调用next()委托给单线程EventLoop去处理

  next().register(channel);

最后在EventLoop的register()中委托给Channel中的Unsafe处理(Unsafe是在Channel创建的时候被创建的)

  channel.unsafe().register(this, promise);

我们来看下Unsafe中的register()的源码:

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            
            AbstractChannel.this.eventLoop = eventLoop;
            //如果eventLoop是当前线程直接执行,否则交给传eventLoop线程处理
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new OneTimeTask() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

Unsafe中的register()做的就是判断next()获取的线程是否是当前线程,如果是同步执行注册,否则异步执行注册,按例子中默认配置会异步调用register0()注册。

我们来看下register0()的源码

 private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //调用JDK去注册Selector
                doRegister();
                neverRegistered = false;
                registered = true;
                //设置注册成功通知监听器
                safeSetSuccess(promise);
                //触发注册成功事件
                pipeline.fireChannelRegistered();
                //如果是第一次则触发激活成功事件
                if (firstRegistration && isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

register0()做了4件事

  • 调用JDK去注册Selector,源码如下
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
  • 设置注册成功通知监听器
  • 触发注册成功事件
  • 如果绑定成功则触发激活成功事件

触发注册成功事件和如果绑定成功触发激活成功事件其实是在主pipeline中链式调用用户的配置ChannelHandler,调用过程如下

图1

这里会有个疑问绑定是在注册selector之后进行的为什么这里可以触发激活成功事件,其实在safeSetSuccess(promise)代码执行之后已经通知监听器并开始执行绑定相关的代码了。

触发激活成功事件后如果配置了会触发开始读事件

    public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();
        //如果配置了会触发开始读事件
        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }

读事件也是链式调用调用过程如下:

图2

可以看到最后调用HeadContext里的read()方法

public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

委托给unsafe去处理,处理的方式就是修改interestOps添加读操作位,由IOSelector去触发读事件

selectionKey.interestOps(interestOps | readInterestOp);

在NioServerSocketChannel中readInterestOp其实对应的是JDK中的SelectionKey.OP_ACCEPT,在ServerSocketChannel构造方法中定义:

 public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

服务端启动后最先关心是客户端连接的,如果没有客户端的连接,后续无法IO操作,所以这里注册了SelectionKey.OP_ACCEPT来监听客户端的连接。

以上就是初始化Channel并注册Selector的流程

下面说下调用doBind0()绑定监听端口并开始监听流程,doBind0()代码如下:

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

绑定端口是异步进行的,这里调用channel.eventLoop()获取的线程默认就是之前注册Selector的流程中next()获取的线程。绑定的流程其实也是在主pipeline中链式调用用户的配置ChannelHandler,如图2,可以看到最后调用HeadContext里的bind()方法

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }

也是委托给unsafe去处理,处理的方式就是调用JDK里的绑定接口去绑定

javaChannel().socket().bind(localAddress);

如果绑定成功且之前没触发激活成功事件则触发之,这里的流程跟之前讲的一样就不累述。

以上就是绑定监听端口并开始监听的流程。

展开阅读全文
打赏
1
8 收藏
分享
加载中
更多评论
打赏
0 评论
8 收藏
1
分享
返回顶部
顶部