Netty 线程模型

原创
2021/01/05 15:16
阅读数 381

Netty 线程模型

本篇文章基于Netty 4.1.56.Final

Netty的线程模型是Netty高性能的关键。Netty的线程模型关乎于2个类:EventLoopGroup和EventLoop。本篇仅以NioEventLoopGroup和NioEventLoop进行探究。

NioEventLoopGroup比较简单其核心数据结构和方法都在其父类MultithreadEventExecutorGroup中,它的核心数据结构就是EventExecutor数组,核心方法就是MultithreadEventExecutorGroup的构造方法,在该方法中调用了newChild方法创建并填充了EventExecutor数组。NioEventLoopGroup中newChild方法的实现就是创建NioEventLoop。NioEventLoopGroup的本质就是NioEventLoop的集合,其核心作用是将一个Channel和一个EventLoop进行绑定。NioEventLoopGroup的register方法就是调用next方法获取一个NioEventLoop,并调用NioEventLoop的register方法和Channel进行绑定。

public ChannelFuture register(Channel channel) {
    return next().register(channel);     选择一个EventLoop并将channel与其绑定
}

next方法会是使用EventExecutorChooser从NioEventLoopGroup中选择一个NioEventLoop,EventExecutorChooser有2个实现GenericEventExecutorChooser采用取余的方式获取下一个;PowerOfTwoEventExecutorChooser是在EventLoop个数是2次幂的时候采用和hashmap中定位数组下标相同的方法获取下一个。

GenericEventExecutorChooser的next()
public EventExecutor next() {
    return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}

PowerOfTwoEventExecutorChooser的next()
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}

NioEventLoop 是Netty线程模型的关键,它负责与其绑定的Channel相关的所有IO事件,ChannelPipeline处理链,异步任务和调度任务的执行。关于ChannelPipeline在这篇有所讲述。NioEventLoop执行的异步任务会被添加到父类SingleThreadEventExecutor的taskQueue中,taskQueue的默认实现是JCTools的MpscQueue。NioEventLoop执行的调度任务会被添加到父类AbstractScheduledEventExecutor的scheduledTaskQueue中。在NioEventLoop的父类SingleThreadEventLoop中还有一个tailTasks队列,这个队列的任务是在每次eventloop中,把IO事件,ChannelPipeline,异步任务和调度任务执行完后执行的任务,通常没有被使用,且相关方法还被标注了@UnstableApi,因此不过多关注该队列。

NioEventLoopGroup构建NioEventLoop时会传入SelectorProvider,因此NioEventLoop可以获取Selector,那Channel关注的IO事件是怎么注册上去的呢?Channel在注册到EventLoop时会调用AbstractChannel的doRegister方法,该方法在AbstractNioChannel中的实现是将该Channel对应的javachannel注册到他的EventLoop的selector中并获取selectionKey,不过此时并没有注册感兴趣的IO事件,只是为了获取selectionKey。此外,还将Channel本身作为attachment进行了附加,这样当处理IO事件的时候就能找到对应的NettyChannel了。代码如下:

注册的IO事件为0,也就是没有感兴趣的IO事件,并将自身作为了附加对象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

在AbstractNioChannel的doBeginRead方法中会将Channel感兴趣的IO事件注册到selectionKey中去。

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

doBeginRead方法会被AbstractUnsafe的beginRead调用,AbstractUnsafe的beginRead会在2个地方被调用一个是HeadContext的read方法,另一个是AbstractUnsafe的register0方法,该方法会先进行channel注册之后调用isActive判断Channel是否是active的,如果是就调用beginRead方法。代码如下:

doRegister();                              先注册
neverRegistered = false;
registered = true;
省略注释
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();
省略注释
if (isActive()) {                         判断是否active
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
省略注释
        beginRead();                     如果active,beginRead
    }
}

而对于NioServerSocketChannel来说,其isActive方法实现如下:

return isOpen() && javaChannel().socket().isBound();

NioServerSocketChannel是先注册后绑定,此时还在注册流程中,还未绑定,显然不是active的,因此不会调用beginRead方法。之后,NioServerSocketChannel会进行绑定,最终会从ChannelPipeline的HeadContext调用到AbstractUnsafe的bind方法,该方法中先进行端口绑定,之后会调用ChannelPipeline的fireChannelActive方法,触发ChannelActive事件在pipeline事件的传播,在HeadContext的channelActive方法中会调用Channel的read方法,channel的read方法就是调用其pipleine的read方法,该方法会从tail向前走到head,最终调用了HeadContext的read方法,最终调到了AbstractUnsafe的beginRead。而对于NioSocketChannel来说,其isActive方法代码如下:

return ch.isOpen() && ch.isConnected();

对于刚创建的连接显然是open和connect的,因此会在register0方法中调用beginRead。简单总结就是,NioServerSocketChannel会在绑定时注册OP_ACCEPT事件,NioSocketChannel会在channel注册时注册OP_READ事件。

最后说说NioEventLoop的核心方法--run方法。run方法是一个死循环,这个循环里做的就是NioEventLoop的核心工作,上述所说的IO事件,ChannelPipeline和3个队列的任务都是在这个for循环里被不断执行的。eventloop的大致步骤如下:
第一步,根据SelectStrategy进行select

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
    continue;

case SelectStrategy.BUSY_WAIT:
    // fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
    if (curDeadlineNanos == -1L) {
        curDeadlineNanos = NONE; // nothing on the calendar
    }
    nextWakeupNanos.set(curDeadlineNanos);
    try {
        if (!hasTasks()) {
            strategy = select(curDeadlineNanos);
        }
    } finally {
        // This update is just to help block unnecessary selector wakeups
        // so use of lazySet is ok (no race condition)
        nextWakeupNanos.lazySet(AWAKE);
    }
    // fall through
default:
}

默认情况下SelectStrategy是NioEventLoopGroup创建NioEventLoop时传入的DefaultSelectStrategyFactory.INSTANCE,改工厂类产生的策略是DefaultSelectStrategy,代码如下:

DefaultSelectStrategy的实现如下
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
NioEventLoop中selectSupplier的实现
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};
int selectNow() throws IOException {
    return selector.selectNow();
}

DefaultSelectStrategy首先判断是否有异步任务,如果有异步任务就立马selectNow,获取IO就绪事件数,如果没有返回SELECT,之后会进入switch的SELECT分支。SELECT分支中首先调用nextScheduledTaskDeadlineNanos方法返回最近的一次调度任务的到期时间,在select方法中利用这个到期时间算出还有多久到达这个到期时间,如果大于5微秒就调用超时阻塞select,否则selectNow。最后理一下这块的逻辑,在一次eventloop中需要处理IO事件,异步任务和调度任务。获取IO就绪事件时有2种方式进行select,selectNow会立即select并返回当前的IO就绪事件数,带超时的select会阻塞直到超时或有就绪IO事件。在DefaultSelectStrategy中如果有异步任务为了尽快执行异步任务就进行了selectNow快速返回当前的IO就绪事件,如果没有异步任务就进入SELECT分支,该分支下会使用带超时的阻塞select,而阻塞时间就是距离最近一个调度任务的时间。
第二步,处理IO事件和task

if (ioRatio == 100) {
    try {
        if (strategy > 0) {
            processSelectedKeys();
        }
    } finally {
        // Ensure we always run tasks.
        ranTasks = runAllTasks();
    }
} else if (strategy > 0) {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
} else {
    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

处理IO事件和task的整体逻辑也是相对简单的,if分支是ioRatio==100时,如果有IO就绪事件则先处理IO就绪事件之后处理所有的task。ioRatio的作用可在esle-if分支中看出,在else-if中先处理IO就绪事件,然后计算处理IO就绪事件的耗时,最后根据处理IO就绪事件的耗时和ioRatio计算出执行task的时间,例如ioTime是8,ioRatio是80,那么本次用耗时2来计算异步任务。最后else分支是没有IO就绪事件时,只执行task且耗时为0,从注释上可以看出就是执行最小数量的task。 processSelectedKeys是处理IO事件的逻辑,整体流程是遍历就绪的SelectionKey,从每个SelectionKey的attachment中获取对应的netty的channel,然后处理改channel上就绪的IO事件。代码如下:

int readyOps = k.readyOps();
省略注释
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {处理连接事件
省略注释
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

省略注释
if ((readyOps & SelectionKey.OP_WRITE) != 0) {处理IO写事件
省略注释
    ch.unsafe().forceFlush();
}

省略注释
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {处理IO读事件
    unsafe.read();
}

runAllTasks是执行异步任务和调度任务的逻辑,if分支执行的是不带参数的runAllTasks方法,首先调用fetchFromScheduledTaskQueue方法将到期的调度任务从scheduledTaskQueue移到taskQueue中去,然后调用runAllTasksFrom方法依次执行taskQueue中的任务。代码如下:

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        fetchedAll = fetchFromScheduledTaskQueue();  将到期的调度任务移到taskQueue中去
        if (runAllTasksFrom(taskQueue)) {            依次执行taskQueue中的任务
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();                         
    return ranAtLeastOne;
}

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime);  获取到期的调度任务
        if (scheduledTask == null) {
            return true;
        }
        if (!taskQueue.offer(scheduledTask)) {                 将到期的调度任务放入taskQueue中
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {依次执行taskQueue中的task
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}

else和else-if执行的是带耗时参数的runAllTasks,其整体逻辑和不带耗时参数的runAllTasks一样都是将到期调度任务移入taskQueue中,再依次执行taskQueue中的任务。不同的是带耗时参数的runAllTasks方法会根据传入的耗时计算出一个到期时间,每执行64个任务就检查是否到了到期时间,到了就直接跳出不再执行任务。如果传入的耗时为0的话,执行64个任务后一定会跳出执行循环。代码如下:

for (;;) {
    safeExecute(task);

    runTasks ++;

    // Check timeout every 64 tasks because nanoTime() is relatively expensive.
    // XXX: Hard-coded value - will make it configurable if it is really a problem.
    if ((runTasks & 0x3F) == 0) { 每64个任务检查是否达到deadline,
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        if (lastExecutionTime >= deadline) {如果达到就不再执行
            break;
        }
    }

    task = pollTask();
    if (task == null) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        break;
    }
}

根据上面的描述,可以宏观上对Netty的线程模型进行一个总结,netty线程模型关乎2个类EventLoopGroup和EventLoop,其中EventLoopGroup是EventLoop的集合,主要作用是为Channel选择分配一个EventLoop,EventLoop负责channel所有事件和任务的处理。一个Channel对应一个EventLoop,一个EventLoop负责多个Channel。EventLoopGroup创建EventLoop时会传入Selector,这样EventLoop就可以在各自的循环中不断selectIO事件,每次循环不仅会处理IO事件还会处理异步任务和调度任务。示意图如下: EventLoop

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部