Netty Hello World 入门源码分析

2019/04/10 10:10
阅读数 21

第一节简单提了什么是网络编程,Netty 做了什么,Netty 都有哪些功能组件。这一节就具体进入 Netty 的世界,我们从用 Netty 的功能实现基本的网络通信开始分析 各个组件的使用。

1. 一个简单的发送接收消息的例子

话不多说,先来实现一个发送接收消息的例子。本实例基于 SpringBoot 工程搭建。

项目类文件如下:

客户端和服务端的主要代码分为3个部分:启动器,ChannelInitializer,eventHandler。

相关代码已经上传 GitHub,请参阅:点我 (๑¯ ³ ¯๑)

Server端:

package com.rickiyang.learn.helloWorld;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description: server 端
 */
@Slf4j
public class HwServer {

    private int port;

    public HwServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server start fail",e);
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HwServer server = new HwServer(7788);
        server.start();
    }
}

server initializer:

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
        ctx.write("hi, received your msg");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

server handler:

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
        ctx.write("hi, received your msg");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

客户端代码:

client:

package com.rickiyang.learn.helloWorld;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwClient {

    private  int port;
    private  String address;

    public HwClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());
        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().writeAndFlush("Hello world, i'm online");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("client start fail",e);
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HwClient client = new HwClient(7788,"127.0.0.1");
        client.start();
    }
}

client initializer :

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Created by Administrator on 2017/3/11.
 */
public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 客户端的逻辑
        pipeline.addLast("handler", new HwClientHandler());
    }
}

client handler :

package com.rickiyang.learn.helloWorld;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class HwClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server say : " + msg.toString());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("client channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client is close");
    }


}

代码很简单,主要功能就是启动 服务端 和 客户端, 然后实现一个简单的 handler ,在handler 中获取消息并打印,本地先启动服务端 main 函数,再启动客户端即可。

2. 从 EventLoopGroup 开始说起

观看客户端 和 服务端 的启动类,我们看到都有相同的特性:

创建 EventLoopGroup 去监听 channel,然后使用定义的 handler处理对应的事件。

Netty 是一个异步事件驱动的 NIO 框架,所有IO操作都是异步非阻塞的。Netty 实际上是使用 Threads(多线程)处理 I/O 事件。

EventLoopGroup 是个啥东西呢?我们回想一下 Reactor 模型,主要的操作是使用 Selector 监听 channel 上的事件,Reactor 模型有三种结构,首先是单线程模型:

这种模型显而易见始终只有一个 Acceptor 线程在处理客户端连接事件和服务端产生的读写事件,好处是始终只有一个线程在工作不会产生并发带来的一系列问题。但是不足之处也显而易见:

  1. 一个线程来处理对于现在的多核系统来说有点浪费资源;
  2. 虽然是使用异步非阻塞I/O处理,但是面对大并发的请求场景,很有可能会负载过重,堆积事件,这样客户端就会有超时发生,然后重复发送请求,必然会造成系统超载;
  3. 单线程如果挂掉了系统就停止了,这种场景如何处理。

所以这种单线程模型对于当今系统的发展是没有适用场景的。接着又演变出多线程的 Reactor 模型。

在多线程模型下,Acceptor 是一个单独的线程专门处理 Client 的请求连接事件,所有的 I/O 操作都由一个特定的 NIO 线程池负责,每个客户端都与一个特定的 NIO 线程池绑定,因此这个客户端连接中的所有 I/O 操作都是在同一个线程中完成的。客户端连接是很多的,但是 NIO 线程很少,所以 一个 NIO 线程可以同时绑定到多个客户端连接中。

从上面的模型找缺点的话,很显然能发现还是有单点的问题:处理客户端连接请求的线程仍旧只有一个,如果这个线程挂了,整个系统将不可用。所以这种超级并发的情况也要考虑啊,系统不能有单点,接着改:

现在的系统就没有单点问题了,但是也增加了复杂性。

我们刚才在说 EventLoopGroup ,为啥突然又转到了 Reactor 模型上去了呢? 前面说过,Netty 是基于 NIO 编程的,NIO 又是基于 Reactor 模型的,自然 Netty 的编程模型也是 Reactor 。而 EventLoopGroup 其实就是来设置 Reactor 模型的类型根据不同的参数方式。

我们来看 Server 启动类中的写法:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ServerChannelInitializer());

这里创建了两个 group,下面 bootstrap set group的时候 set了两个线程池,即 Acceptor 使用一个线程池,一个 Reactor 线程池。但是可以看到两个线程池都没有设置大小,进去 NioEventLoopGroup 的构造方法可以看到 默认值是 0,即初始化为0,不开启线程,当有事件进来的时候会开启一个线程来处理。那么如果将 workGroup 设置为多个线程的时候,上面这种写法就是 Reactor 的多线程模型。

我们再来看另一种写法:

EventLoopGroup bossGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ServerChannelInitializer());

这里的 group() 方法与上面的区别在于只有一个参数,进入方法内部看看:

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

/**
     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
     * {@link Channel}'s.
     */
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

很明显它调用了带两个 EventLoopGroup 参数的 group方法,即两个 group 线程池使用的是同一个。这就是 Reactor 的单线程模型。

还有一个 Reactor 的主从多线程模型,这个在 Netty 中是没有实现的,即你将 bossGroup 的线程设置为大于1,这个不会改变 Acceptor 的时候事件处理方式,因为在服务端启动的时候 ServerSocketChannel 只会绑定到 bossGroup 中的一个线程,即使你设置了多个,启动的时候只会使用一个。Netty 官方认为处理连接请求的时候没有必要使用多线程的方式。

现在我们了解到 EventLoopGroup 的作用是初始化线程池的,那就一起看看它是怎么实现的吧。

首先看一下 EventLoopGroup 的类结构图:

可以看到它继承了 ScheduledExecutorService,即 EventLoopGroup 有线程池调度的能力。上面在代码中我们使用的是 EventLoopGroup 的子类 NioEventLoopGroup, 还有一个OioEventLoopGroup也可以使用。继续看 NioEventLoopGroup 的类结构:

可以看到继承关系为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。

先看一下我们使用的默认无参构造方法:

public NioEventLoopGroup() {
    this(0);
}

这里默认设置为 0 ,但是后面的逻辑会判断如果为 0,那么会将 线程数设置为当前 CPU * 2。

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

这个构造函数将 线程执行的 Executor 设置为空,后面会判断为空重新构造一个 Executor。

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
    int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

上面第一个构造方法是使用了 JDK 的 NIO 生成一个 Selector,第二个是生成一个 Selector 默认策略。接着进入第三个构造方法,这里使用了父类 MultithreadEventLoopGroup 的 构造方法,还 set 了一个线程拒绝策略。

跟进父类的构造方法:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

这里对线程数进行了判断,如果是 0 则赋默认值,这里的默认值就是当前核心数 * 2。

接下来又调用了它的父类 MultithreadEventExecutorGroup 的构造器:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

再点击进入,终于能看到一段实质性的代码了,太不容易:

/**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    //上面我们有个构造函数传 executor == null。在这里判断如果为空,则创建一个新的ThreadPerTaskExecutor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
	// 这里就是创建指定大小的线程池,线程池中的每一个元素都是一个 EventLoop
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //存入了一个 NIOEventLoop 类实例
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                //这里如果创建失败,首先尝试优雅停止线程,下面会判断线程未正常停止的情况继续判断
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
	//实例化线程工厂执行器选择器: 根据children获取选择器
    chooser = chooserFactory.newChooser(children);

    //为每一个 EventLoop 线程添加一个 线程终止的监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
	
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
	// 到这里 eventLoop 就创建完毕,接着做了一个操作:把 eventLoop 添加进一个新的不可变的 set集合中,即声明只读属性
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

这段代码的实质就是来创建 EventLoop 实例,并 set 线程执行。

  1. 首先校验 executor 是否为空,如果是空,则新建一个 ThreadPerTaskExecutor 对象。这个 executor 是用来执行线程池中的所有的线程,也就是所有的 NioEventLoop,其实从 NioEventLoop 构造器中也可以知道,NioEventLoop 构造器中都传入了executor这个参数
  2. 接着创建了一个指定大小的线程池,这里的线程池就是用来执行我们的 EventLoop,即监听事件。
  3. 下面就开始往线程池中放东西了,for 循环的开始是一个 newChild(executor, args)方法,这个方法主要实现的功能就是 new 出一个 NioEventLoop 实例,具体可以参考 NioEventLoopGroup 中的方法:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
  1. eventLoop 创建完毕,下面的操作就是一个安全性保护,这里就可以使用了。当有 I/O 事件来,就从线程池中取出一个线程来执行,那么怎么取就是根据 chooser 选择器的策略来执行, 调用选择器的 next()方法。

这里线程的初始化就结束了,所以这么多的转跳只做了一个事情:创建 Selector 的线程池。

3. NioEventLoop 做了什么

上面分析了 EventLoopGroup 的作用是定义了一个线程池,创建 EventLoop,而EventLoop 的作用不言而喻,按照 Reactor 模型来理解,大概就是两件事:监听连接请求,将事件分发给 handler 处理。下面我们就详细分析一下 NioEventLoop 的代码。

入口就是 newChild()方法,返回的是 EventLoopGroup 的构造函数。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    selector = openSelector();
    selectStrategy = strategy;
}

//父类构造函数
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    //创建了一个指定大小的队列
    tailTasks = newTaskQueue(maxPendingTasks);
}

//父类的父类构造函数
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

private Selector openSelector() {
    final Selector selector;
    try {
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }

    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
            } catch (ClassNotFoundException e) {
                return e;
            } catch (SecurityException e) {
                return e;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
        if (maybeSelectorImplClass instanceof Exception) {
            Exception e = (Exception) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
        }
        return selector;
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);

                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
    } else {
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", selector);
    }

    return selector;
}

上面是 NioEventLoop 的构造函数,里面又有调用父类的构造器,一般这种我们看到都很头痛,写码一直爽,看码火葬场,你需要一直跳进跳出。我们上面启动 Netty 服务的是时候分配了一个 boss 线程池,一个 worker 线程池,boss 线程池启动只需要一个线程,主要负责客户端的连接请求;而 worker 线程池就是用来处理 当前这个连接上所有事件用的,一个 worker 线程就是一个 EventLoop,一个 channel 只会被一个 EventLoop 处理,但 一个 EventLoop 可以处理多个 channel。

NioEventLoop 的本质是一个线程,那么这个线程是在何时被初始化,又是如何处理连接事件的监听的呢?至少目前是没有看到眉目,我们先看一下类结构图,从父类身上找找关键信息:

SingleThreadEventLoop 的父类 SingleThreadEventExecutor 的构造函数很有意思了,这是一个只有一个线程的线程池, 先看看其中的几个变量:

  1. state:线程池当前的状态;
  2. taskQueue:存放任务的队列;
  3. thread:线程池维护的唯一线程;
  4. scheduledTaskQueue:定义在其父类AbstractScheduledEventExecutor中,用以保存延迟执行的任务。

我们先记住这些变量哈,下面会解释。

因为 EventLoop 本质就是一个线程,这个线程的初始化在哪呢?往上翻看父类的信息,不难看出:SingleThreadEventExecutor 类里面有初始化线程的操作,它的初始化过程在 doStartThread()方法中,往上跟踪初始化的调用链:


@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}


private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

调用的地方是 execute()方法。这个方法是 SingleThreadEventExecutor 对外暴露的唯一接口:

public interface Executor {
    void execute(Runnable command);
}

即所有通过 EventLoop 提交的任务都是通过这一个线程来执行。另外上面的父类构造方法中我们看到有队列的初始化,不难看出,队列的作用是当有多个事件同时在一个 EventLoop 中待执行的时候,EventLoop 的做法是将任务包装成对象存放在队列中然后按照先后顺序执行。那么是不是肯定有个执行的方法呢?比如一个循环的取出任务的方法,这个是有的,先看 doStartThread()方法的代码:

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
              ...
              ...
              ...
        }
    });
}

代码比较多,就赶重要的说,看到在 doStartThread() 方法中执行了一个异步线程,而线程中做的事情是调用SingleThreadEventExecutor.this.run()方法。这个 run()是何方神圣呢? SingleThreadEventExecutor 中的 run()只是一个抽象方法:

protected abstract void run();

具体的实现在子类,我们看 NioEventLoop 里面的实现:

@Override
protected void run() {
    for (;;) {
        try {
          	// 判断接下来是是执行select还是直接处理IO事件和执行队列中的task
            // hasTask判断当前线程的queue中是否还有待执行的任务
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                		// 说明当前queue中没有task待执行
                    select(wakenUp.getAndSet(false));
										// 唤醒epoll_wait
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
                 /* ioRatio调节连接事件和内部任务执行事件百分比
       * ioRatio越大,连接事件处理占用百分比越大 */
            final int ioRatio = this.ioRatio;
          	// 如果比例是100,表示每次都处理完IO事件后,执行所有的task
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();
								// 处理IO事件
                processSelectedKeys();
								// 当前时间减去处理IO事件开始的时间就是处理IO事件花费的时间
                final long ioTime = System.nanoTime() - ioStartTime;
              	// 执行task的时间taskTime就是ioTime * (100 - ioRatio) / ioRatio
                // 如果taskTime时间到了还有未执行的task,runAllTasks也会返回
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);

            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    }
}

// io.netty.channel.DefaultSelectStrategy#calculateStrategy
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
  // 如果还有task待执行则先执行selectNow,selectNow是立即返回的,不是阻塞等待
  // 如果没有待执行的task则执行select,有可能是阻塞等待IO事件
  return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}


// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
  @Override
  public int get() throws Exception {
    // epoll_wait的参数timeout可以指定超时时间,selectNow传入的参数是0,也就是不超时等待立即返回
    return selectNow();
  }
};

上面的 run()方法里面是一个死循环,在执行select()前有一个hasTasks()的操作,这个hasTasks()方法判断当前 taskQueue 是否有元素。如果 taskQueue 中有元素,执行 selectNow()方法,最终执行selector.selectNow(),该方法会立即返回,保证了 EventLoop 在有任务执行时不会因为 I/O 事件迟迟不来造成延后处理,这里优先处理 I/O 事件,然后再处理任务。

知识点

这里插入一个知识点,selectNow() 其实暴露的就是 Java 封装的 epoll 模型的一部分。具体参考:

java.nio.channels.Selector 类:

public abstract class Selector implements Closeable {

  
    protected Selector() { }

   
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

   
    public abstract boolean isOpen();

  
    public abstract SelectorProvider provider();

    public abstract Set<SelectionKey> keys();

   
    public abstract Set<SelectionKey> selectedKeys();

   
    public abstract int selectNow() throws IOException;

   
    public abstract int select(long timeout)
        throws IOException;

    public abstract int select() throws IOException;

   
    public abstract Selector wakeup();

    public abstract void close() throws IOException;

}

上面三个select方法都调用了 lockAndDoSelect,只是 timeout 参数不同,其实最后就是调用 epoll_wait 参数不同,epoll_wait 有一个timeout参数,表示超时时间:

  • -1:阻塞
  • 0:立即返回,非阻塞
  • 大于0:指定微秒

详细的分析限于篇幅就不在这里说了哈。大家可以下去慢慢看。

如果当前 taskQueue 没有任务时,就会执行select(wakenUp.getAndSet(false))方法,代码如下:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
          /* delayNanos(currentTimeNanos):计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),默认返回1s。每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列PriorityQueue,启动线程时,往队列中加入一个任务。*/
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
          /* 如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。*/
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

        
			//如果在wakenUp值为true时提交了任务,则该任务没有机会调用
            // Selector#wakeup。因此,我们需要在执行选择操作之前再次检查任务队列。
            //如果不这样做,则可能会挂起任务,直到选择操作超时。
            //如果管道中存在IdleStateHandler,则可能要等待直到空闲超时。
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
			// 超时阻塞select
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
			// 有事件到来 | 被唤醒 | 有内部任务 | 有定时任务时,会返回
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
               
                break;
            }
            if (Thread.interrupted()) {
                
				//线程被中断,因此重置选定的键并中断,这样我们就不会遇到繁忙的循环。
                //由于这很可能是用户或其客户端库的处理程序中的错误,因此我们将其记录下来。
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                                 "Thread.currentThread().interrupt() was called. Use " +
                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            //此处的逻辑就是: 当前时间 - 循环开始时间 >= 定时select的时间timeoutMillis,说明已经执行过一次阻塞select()
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 阻塞超时后没有事件到来,重置selectCnt
                selectCnt = 1;
              // 如果空轮询的次数大于空轮询次数阈值 SELECTOR_AUTO_REBUILD_THRESHOLD(512)
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                       selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
								// 1.首先创建一个新的Selecor
                // 2.将旧的Selector上面的键及其一系列的信息放到新的selector上面。
                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                             selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                         selector, e);
        }
        // Harmless exception - log anyway
    }
}

这个方法解决了Nio中臭名昭著的 BUG:selector 的 select 方法导致空轮询 cpu100%。对 Selector()方法中的阻塞定时 select(timeMIllinois) 操作的 次数进行统计,每完成一次 select 操作进行一次计数,若在循环周期内 发生 N 次空轮询,如果 N 值大于 BUG 阈值(默认为512),就进行空轮询 BUG 处理。重建 Selector,判断是否是其他线程发起的重建请求,若不是则将原 SocketChannel 从旧的 Selector 上去除注册,重新注册到新的 Selector 上,并将原来的 Selector 关闭。

当java NIO BUG 触发时,进行 Selector 重建,rebuildSelector 过程如下:

  1. 通过方法 openSelector 创建一个新的 selector;
  2. 将 old selector 的 selectionKey 执行 cancel;
  3. 将 old selector 的 channel 重新注册到新的 selector 中。

Netty 的连接处理就是 I/O 事件的处理,I/O 事件包括 READ 事件、ACCEPT 事件、WRITE 事件和 OP_CONNECT 事件:

  • ACCEPT 事件:连接建立好之后将该连接的 channel 注册到 workGroup 中某个 NIOEventLoop 的 selector中;
  • READ 事件:从 channel 中读取数据,存放到 byteBuf 中,触发后续的 ChannelHandler 来处理数据;
  • WRITE 事件:正常情况下一般是不会注册写事件的,如果 Socket 发送缓冲区中没有空闲内存时,在写入会导致阻塞,此时可以注册写事件,当有空闲内存(或者可用字节数大于等于其低水位标记)时,再响应写事件,并触发对应回调;
  • CONNECT 事件:该事件是 Client 触发的,由主动建立连接这一侧触发的。

再把目光从 select() 方法拉回到 run() 方法, 这个死循环的终止逻辑是遇到 confirmShutdown() 方法。然后在循环里会询问是否有事件,如果没有,则继续循环,如果有事件,那么就开始处理。

往下看代码中有一个字段:ioRatio,默认值是 50,这个比例是处理 I/O 事件所需的时间和花费在处理 task 时间的比例。即如果花了 5s 去处理 I/O 事件, 那么也会花 5s 去处理 task 任务。处理 I/O 事件的操作主要是在 processSelectedKeys()方法中:

private void processSelectedKeys() {
  //
  if (selectedKeys != null) {
    processSelectedKeysOptimized(selectedKeys.flip());
  } else {
    processSelectedKeysPlain(selector.selectedKeys());
  }
}

当有了新 I/O 请求进来, JDK 原生的 Selector 将 SelectionKey 放入感兴趣的 key 的集合中,而这个集合现在就是 Netty 通过反射的方式强制替换为以数组为数据结构的selectedKeys

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
  for (int i = 0;; i ++) {
    final SelectionKey k = selectedKeys[i];
    if (k == null) {
      break;
    }
    //数组输出空项, 从而允许在channel 关闭时对其进行垃圾回收
    //数组中当前循环对应的keys置空, 这种感兴趣的事件只处理一次就行
    selectedKeys[i] = null;
		// 获取出 attachment,默认情况下就是注册进Selector时,传入的第三个参数  this===>   NioServerSocketChannel
    // 一个Selector中可能被绑定上了成千上万个Channel,  通过K+attachment 的手段, 精确的取出发生指定事件的channel, 进而获取channel中的unsafe类进行下一步处理
    final Object a = k.attachment();

    if (a instanceof AbstractNioChannel) {
      //进入这个方法, 传进入 感兴趣的key + NioSocketChannel
      processSelectedKey(k, (AbstractNioChannel) a);
    } else {
      @SuppressWarnings("unchecked")
      NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
      processSelectedKey(k, task);
    }
		// 判断是否需要再次轮询
    if (needsToSelectAgain) {
   
      for (;;) {
        i++;
        if (selectedKeys[i] == null) {
          break;
        }
        selectedKeys[i] = null;
      }

      selectAgain();
  
      selectedKeys = this.selectedKeys.flip();
      i = -1;
    }
  }
}

selectedKeys 是一个 set,与 selector 绑定,selector 在调用 select() 族方法的时候,如果有 I/O 事件发生,就会往 selectedKeys 中塞相应的 selectionKey。而 selectedKeys 内部维护了两个 SelectionKey[] 数组,重写了 set#add 方法,在 #add 的时候实际上是往数组里面塞 SelectionKey。而在遍历时只用遍历数组而不是遍历set。

处理轮询到的IO事件也主要是三步:

  1. 取出轮询到的SelectionKey
  2. 取出与客户端交互的channel对象,处理channel
  3. 判断是否需要再次轮询

上面提到过,一个 EventLoop 是可以处理多个 channel 的,并且保证一个 channel 事件只会在同一个 EventLoop 中被处理,那么这里的如何保证同一个 channel 会被某个曾经处理过他的 EventLoop 识别呢?

关键就在 SelectionKey,上面看到 a 对象其实就是一个 NioSocketChannel,在 AbstractNioChannel 中有一个#doRegister 方法,这里将 JDK 的 channel 注册到 selector 上去,并且将自身设置到 attachment 上。这样 JDK 轮询出某条 SelectableChannel 有 I/O 事件发生时,就可以直接取出 AbstractNioChannel 了。

我们继续看看 processSelectedKey(k, (AbstractNioChannel) a)是如何处理感兴趣的事件的:

//当有新连接进来,就会到这里
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  //netty底层对数据的读写都是unsafe完成的。这个unsafe也是和Channel进行唯一绑定的对象
  final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  if (!k.isValid()) {
    final EventLoop eventLoop;
    try {
      eventLoop = ch.eventLoop();
    } catch (Throwable ignored) {
      return;
    }
  
    if (eventLoop != this || eventLoop == null) {
      return;
    }
    // close the channel if the key is not valid anymore
    unsafe.close(unsafe.voidPromise());
    return;
  }
	//上面这一串都是在校验 key 的合法性
  try {
    int readyOps = k.readyOps();
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
      unsafe.read();
      if (!ch.isOpen()) {
        // Connection already closed - no need to handle write.
        return;
      }
    }
    //处理write事件的flush
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
      // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
      ch.unsafe().forceFlush();
    }
    //处理读和新连接的accept事件
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
      // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
      // See https://github.com/netty/netty/issues/924
      int ops = k.interestOps();
      ops &= ~SelectionKey.OP_CONNECT;
      k.interestOps(ops);

      unsafe.finishConnect();
    }
  } catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
  }
}

首先在读写之前都要先调用 finishConnect,来确保与客户端连接上。这个过程最终会传递给 channelHandle r的channelActive 方法,因此可以通过 channelActive 来验证有多少客户端在线。

接下来是处理 write 事件的 flush,注意,我们的 write 不是在这里做的,真正的 write 一般是封装成 task 去执行的。

最后是处理读和新连接的 accept 事件。Netty 将新连接的 accept 也当做一次 read。对于 boss NioEventLoop 来说,新连接的 accept 事件在 read 的时候通过他的 pipeline 将连接扔给一个 worker NioEventLoop 处理;而worker NioEventLoop 处理读事件,是通过他的 pipeline 将读取到的字节流传递给每个 channelHandler 来处理。 从这里也可以看出来 Netty 所有关于 I/O 操作都是通过内部的 Unsafe 来实现的。

还记得我们是在哪里扯到了 I/O操作的 processSelectedKeys方法嘛!感觉在扯远的道路上越来越远了。再把视线回到 NioEventLoop 的 #run()方法, I/O 操作都是processSelectedKeys方法来处理,下面还有个runAllTasks方法,它是用于处理封装好的事件操作的。可以看到 runAllTasks 有个函数是带了事件参数的,虽然设定了一个可以运行的时间参数,但是实际上 Netty 并不保证能精确的确保非 I/O 任务只运行设定的毫秒,下面来看下 runAllTasks 带时间参数的代码:

/* timeoutNanos:任务执行花费最长耗时*/
protected boolean runAllTasks(long timeoutNanos) {
    // 把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行。
    fetchFromScheduledTaskQueue();

    // 非阻塞方式pollTask
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 执行task
        safeExecute(task);
        runTasks ++;
        // 依次从taskQueue任务task执行,每执行64个任务,进行耗时检查。
        // 如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

NioEventLoop 执行 task 的过程,同样可以分成几步:

  1. 从 scheduledTaskQueue 转移定时任务到 taskQueue;
  2. 计算本次任务循环的截止时间;
  3. 执行任务;
  4. 执行完任务后的工作。

从上面可以看到 NioEventLoop 中至少有两种队列,taskQueuescheduledTaskQueue

EventLoop 是一个 Executor,因此用户可以向 EventLoop 提交 task。在 execute 方法中,当 EventLoop 处于循环中或启动了循环后都会通过 addTask(task)向 EventLoop 提交任务。SingleThreadEventExecutor 内部使用一个 taskQueue 将task 保存起来。

taskQueue最大的应用场景就是用户在 channelHandler 中获取到 channel,然后通过 channel.write() 数据,这里会把 write 操作封装成一个 WriteTask,然后通过 eventLoop.execute(task) 执行,实际上是给 EventLoop 提交了一个 task,加入到 taskQueue 队列中。

同时,EventLoop也是一个ScheduledExecutorService,这意味着用户可以通过ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)方法向EventLoop提交定时任务。因此,EventLoop内部也维护了一个优先级队列scheduledTaskQueue来保存提交的定时任务。

知道了NioEventLoop内部的任务队列后,再来看执行task的过程。

第一步,是将到期的定时任务转移到taskQueue中,只有在当前定时任务的截止时间已经到了,才会取出来。

然后第二步计算本次任务循环的截止时间deadline。

第三步真正去执行任务,先执行task的run方法,然后将runTasks加一,每执行完64(0x3F)个任务,就判断当前时间是否超过deadline,如果超过,就break,如果没有超过,就继续执行。

需要注意的是,这里如果任务没执行完break掉了,afterRunningAllTasks后,NioEventLoop就会重新开始一轮新的循环,没完成的任务仍然在taskQueue中,等待runAllTasks的时候去执行。

最后一步是afterRunningAllTasks,执行完所有任务后需要进行收尾,相当于一个钩子方法,可以作统计用。 最后总结一下处理任务队列的task的过程就是:

eventLoop是一个Executor,可以调用execute给eventLoop提交任务,NioEventLoop会在runAllTasks执行。NioEventLoop内部分为普通任务和定时任务,在执行过程中,NioEventLoop会把过期的定时任务从scheduledTaskQueue转移到taskQueue中,然后执行taskQueue中的任务,同时每隔64个任务检查是否该退出任务循环。

4. EventLoop 如何绑定 channel

上面的长篇大论其实只是分析了 Netty 初始化一个 Reactor 线程是多么的艰难。考虑了太多的事情。我们一开头写了一个Demo,到现在为止都没有分析到客户端和服务端启动的时候是如何将 Reactor 线程和 channel 绑定起来的,即启动的时候如何将一个 SocketChannel 绑定到 work thread 上。所以我们还是从启动过程分析一下,走一遍总体流程。

客户端启动

再重温一下客户端启动代码:

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
  .channel(NioSocketChannel.class)
  .handler(new ClientChannelInitializer());
try {
  ChannelFuture future = bootstrap.connect(address,port).sync();
  future.channel().writeAndFlush("Hello world, i'm online");
  future.channel().closeFuture().sync();
} catch (Exception e) {
  log.error("client start fail",e);
}finally {
  group.shutdownGracefully();
}

EventLoopGroup 就不用解释了,初始化了一个启动线程池。下面的 Bootstrap 是Netty 封装的启动类,通过一连串的链式调用绑定 Selector 线程,启动指定类型的SocketChannel 和 初始化处理逻辑。

初始化好启动信息之后调用 connect()进行连接:

public ChannelFuture connect(SocketAddress remoteAddress) {
  if (remoteAddress == null) {
    throw new NullPointerException("remoteAddress");
  }

  validate();
  return doResolveAndConnect(remoteAddress, config.localAddress());
}

首先是校验一下必传参数是否存在,端口、IP 以及 handler 信息是否初始化。下面的 doResolveAndConnect方法就是连接的主要逻辑:

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
  //完成channel 的初始化和注册
  final ChannelFuture regFuture = initAndRegister();
  final Channel channel = regFuture.channel();
	//注册成功直接返回
  if (regFuture.isDone()) {
    if (!regFuture.isSuccess()) {
      return regFuture;
    }
    return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
  } else {
    // 如果注册还在进行中,需要向future对象添加一个监听器,以便在注册成功的时候做一些工作,监听器实际上就是一个回调对象
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        // Direclty obtain the cause and do a null check so we only need one volatile read in case of a
        // failure.
        Throwable cause = future.cause();
        if (cause != null) {
          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
          // IllegalStateException once we try to access the EventLoop of the Channel.
          promise.setFailure(cause);
        } else {
          // Registration was successful, so set the correct executor to use.
          // See https://github.com/netty/netty/issues/2586
          promise.registered();
          // 注册成功后仍然调用doResolveAndConnect0方法完成连接建立的过程
          doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
        }
      }
    });
    return promise;
  }
}

这个方法其实只做了两个事情:

  • 初始化一个 Channel 对象并注册到 EventLoop 中;
  • 调用 doResolveAndConnect0() 方法完成 tcp 连接的建立。

继续看初始化 channel 的过程:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
      //使用工厂类ChannelFactory的newChannel通过反射创建Channel实例
        channel = channelFactory.newChannel();
      //调用init方法执行初始化操作
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // 若在创建实例和初始化期间抛出异常,创建DefaultChannelPromise实例,写入异常并返回
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
		//调用EventLoopGroup的register方法,完成注册操作
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

  		 /** 如果程序到这里,说明promise没有失败,可能发生以下情况之一 
         * 1) 如果尝试将Channel注册到EventLoop,且此时注册已经完成;inEventLoop返回true,channel已经成功注
         *    册,可以安全调用bind() or connect()
         * 2) 如果尝试注册到另一个线程,即inEventLoop返回false,则此时register请求已成功添加到事件循环的任务队
         *    列中,现在同样可以尝试bind()或connect(),因为bind()或connect()会被调度在执行register 
         *    Task之后执行, 因为register(),bind()和connect()都被绑定到同一个I/O线程。
         */
    return regFuture;
}

channel 的注册过程主要就在上面的一句代码中:

ChannelFuture regFuture = config().group().register(channel);

跟着register()往下走,可以跟到一段代码:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  ...
  ...
  ...
  AbstractChannel.this.eventLoop = eventLoop;

  if (eventLoop.inEventLoop()) {
    register0(promise);
  } else {
    try {
      eventLoop.execute(new Runnable() {
        @Override
        public void run() {
          register0(promise);
        }
      });
    } catch (Throwable t) {
      logger.warn(
        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
        AbstractChannel.this, t);
      closeForcibly();
      closeFuture.setClosed();
      safeSetFailure(promise, t);
    }
  }
}

这里就是从 EventLoopGroup 中拿到特定的 EventLoop,如何分配的过程上面已经有分析,就是调用 NioEventLoop 的 next()方法。判断 NioEventLoop 的线程是否已经启动,如果已经启动,调用 register0方法;否则调用 eventLoop.execute 方法启动线程。

再跟一下 register0(promise)的代码:

private void register0(ChannelPromise promise) {
  try {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
      return;
    }
    boolean firstRegistration = neverRegistered;
    //调用JDK去注册Selector
    doRegister();
    neverRegistered = false;
    registered = true;
    //设置注册成功通知监听器
    safeSetSuccess(promise);
    //触发注册成功事件
    pipeline.fireChannelRegistered();
    //如果是第一次则触发激活成功事件
    if (firstRegistration && isActive()) {
      pipeline.fireChannelActive();
    }
  } catch (Throwable t) {
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
  }
}

在这里调用 JDK NIO 去注册 Selector,设置注册成功的监听事件。

这里是不是就把上面分析的 EventLoop 的 execute 方法和 启动联系起来了,通过execute 来执行task。

initAndRegister方法的主要过程就分析完了,其实主要有三点:

  1. 创建了一个NioServerSocketChannel对象;
  2. 为NioServerSocketChannel对应的ChannelPipeLine增加了一个ServerBootstrapAcceptor处理器,用来处理新的连接;
  3. 从NioEventLoopGroup中分配了一个NioEventLoop,用于监听NioServerSocketChannel通道上的 I/O 事件。

客户端的启动就分析完成,工作量还是不少哈。

Server 端启动

还是先看一下 Server 启动的代码:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ServerChannelInitializer());

try {
  ChannelFuture future = server.bind(port).sync();
  future.channel().closeFuture().sync();
} catch (InterruptedException e) {
  log.error("server start fail",e);
}finally {
  bossGroup.shutdownGracefully();
  workGroup.shutdownGracefully();
}

这里看到有个区别是:客户端启动是通过 Bootstrap 启动类来实现,调用 connect()方法,服务端启动是ServerBootstrap启动类来实现,调用 bind()方法。

public ChannelFuture bind(SocketAddress localAddress) {
  validate();
  if (localAddress == null) {
    throw new NullPointerException("localAddress");
  }
  return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
  final ChannelFuture regFuture = initAndRegister();
  final Channel channel = regFuture.channel();
  if (regFuture.cause() != null) {
    return regFuture;
  }

  if (regFuture.isDone()) {
    // At this point we know that the registration was complete and successful.
    ChannelPromise promise = channel.newPromise();
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
  } else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
          // IllegalStateException once we try to access the EventLoop of the Channel.
          promise.setFailure(cause);
        } else {
          // Registration was successful, so set the correct executor to use.
          // See https://github.com/netty/netty/issues/2586
          promise.registered();

          doBind0(regFuture, channel, localAddress, promise);
        }
      }
    });
    return promise;
  }
}

bind 方法里面就没啥要说的,已经在客户端启动的时候解释过了,创建 channel ,绑定 EventLoop,再看doBind0()方法:

private static void doBind0(
  final ChannelFuture regFuture, final Channel channel,
  final SocketAddress localAddress, final ChannelPromise promise) {

  // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
  // the pipeline in its channelRegistered() implementation.
  channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
      if (regFuture.isSuccess()) {
        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
      } else {
        promise.setFailure(regFuture.cause());
      }
    }
  });
}

是不是跟客户端的 register()方法大同小异呢,也是调用 EventLoop 提交 task。

5. 本篇小结

回顾这一篇,开始写了一个 Demo, 给出了如何开启一个 Netty 服务端和客户端的案例。然后从EventLoopGroup 说起,我们谈到了 Reactor 的3种线程模型,接着继续说 EventLoopGroup 的作用就是一个承载 EventLoop 的线程池,那么 EventLoop 的作用又是什么。它是执行连接事件和 I/O 事件的基石,本质就是一个Selector 线程。所有的链接都是在这里初始化和承载,由它交于后面的 handler 进行处理,处理完之后的结果由它返回给连接的客户端。

最后从客户端 和 服务端 的启动代码一起分析了启动的流程,将启动的过程与 EventLoop 的 execute()方法连接起来,组成一个整体。

Netty 之所以在启动的时候代码写的这么复杂,主要原因是要适配它所搭起来的框架,所以调用流程非常的隐晦。这个过程中也做了很多的工作,处理各种异常,以及原生 NIO 的 BUG。这一篇暂时就先到这里,下一节我们来聊建立起来 channel 和 Selector 的关系之后,后面的 handler 操作又是如何处理的。

原文出处:https://www.cnblogs.com/rickiyang/p/12562408.html

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部