文档章节

netty源码分析系列——EventLoop

杨武兵
 杨武兵
发布于 2017/04/30 16:56
字数 3167
阅读 482
收藏 3

前言

EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面的源码分析中我们对它已经的使用非常熟悉了,凡事涉及到耗时的,可能阻塞线程的操作,netty都使用了eventLoop来异步执行,我们进入到它的内部来看看它是如何实现的。

如何使用

我们在之前的源码中,常见的用法如下示例所示:

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();//1

        final CountDownLatch downLatch = new CountDownLatch(1);
        eventLoopGroup.next().execute(new Runnable() {//2
            public void run() {
                for(int i=1; i<1000; i++){
                    //TODO 只是模拟一个耗时操作。
                }
                System.out.println("耗时操作执行完毕");
                downLatch.countDown();
            }
        });
        downLatch.await();
        System.out.println("等待异步线程耗时操作执行完毕");
    }

1处构造一个EventLoopGroup事件起循环器组对象。该接口有多种实现类,而我们常用的Nio相关的实现类是NioEventLoopGroup,下面的类结构章节中会详细介绍它的类结构。

2处先调用了next()方法从事件循环器组获得下一个EventLoop对象,然后就可以调用它的execute方法,该方法传递一个Runnbale对象作为参数,最后会用线程异步执行其run方法;execute方法是继承自jdk自带的并发包中的Executor接口。

类结构

上图是netty中的类结构图,它是一个庞大的类继承体系。有篇文章介绍得非常不错,本文直接引用它,不做详细介绍。

http://www.jianshu.com/p/da4398743b5a

http://www.jianshu.com/p/d0f06b13e2fb

其它的接口,抽象类和实现类的分析参考上述两篇文章,本文最终只分析NioEventLoop和NioEventLoopGroup两个类的实现。

NioEventLoop

介绍

NioEventLoop是基于jdk的nio包实现的事件循环器,它包含了一个线程,该线程会循环处理io事件和其它任务。

NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

I/O任务
即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。

非IO任务
添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

让我一起来看看NioEventLoop的关键代码。

关键属性

selector:这个就是jdk中nio的多路复用器Selector对象,熟悉nio类的人应该很清楚了,每个NioEventLoop会绑定一个selector对象。调用它的select()或者selectNow方法来读取io事件。

selectStrategy:选择策略SelectStrategy对象。它提供了一种控制select循环行为的能力,要么是select方法阻塞线程,要么是继续循环两种行为。

provider:SelectorProvider类的对象,它是Selector的工厂类,调用它的openSelector方法即可打开一个新的selector对象。

ioRatio:表示处理io事件时间占比,默认值是50,即各占一半时间。

关键方法

接下来我们看看它的一些关键方法。

run方法

 @Override
    protected void run() {
        for (;;) {//1
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {//2
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));//3
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {//4
                    try {
                        processSelectedKeys();
                    } finally {//5
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();//6
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//7
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

run方法是当前线程执行的方法。

1处是一个for循环,没有退出条件。这和事件循环器的概念对应起来了,该线程会一直循环,检查selector有没有新的io事件,如果有则处理,或者执行提交到该事件循环器的其它task;为了避免线程一直循环空跑,浪费CPU资源,那么在获取新的io事件或者其它task要阻塞线程,避免一直循环,等待有新的io事件或者task的时候才唤醒线程。

2处计算出选择策略,如果是运算处的值是“CONTINUE"则会继续下一次循环。默认策略是:如果有普通任务待执行,使用selectNow();否则使用select(boolean oldWakenUp)。NIO的Selector有三个select()方法,它们的区别如下:

select() 阻塞直到有一个感兴趣的IO事件就绪
select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout
selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪

3处如果计算出的策略是“SELECT”,则调用select方法处理,后面会详细介绍该方法。

4处如果io事件比例是100%,则先调用方法processSelectedKeys处理选择到的key,我们下面会详细介绍该方法。

5处调用runAllTasks方法处理其它任务。确保始终能够执行到其它任务,但是没有传入参数,该方法后面也将继续介绍。

6处则是io事件比例小于100%。也是先调用processSelectedKeys方法处理io事件。

7处调用runAllTasks方法,但是传入了按照比例分配给任务的事件片,限制了任务执行的事件,通过ioTime * (100 - ioRatio) / ioRatio计算得到任务执行事件片。

select方法

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//delayNanos是最近一个调度任务的到期时间,没有调度任务返回1秒。selectDeadLineNanos指可以进行select操作的截止时间点
            for (;;) {//进入for循环。
                // 四舍五入将select操作超时时间换算为毫秒单位
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {//超时时间小于等于0,则不再进行其它的select操作。
                    if (selectCnt == 0) {//如果没有进行过select操作。
                        selector.selectNow(); //调用非阻塞的selectNow后返回。
                        selectCnt = 1;//计数器=1.
                    }
                    break;//退出for循环。
                }

                //如果任务提交的时候wakenUp的值是ture,则该任务没有机会执行。
                // Selector#wakeup.因此我们再次检查任务队列,在执行select操作之间。
                // 如果我们不检查的话,则该任务会被延迟到select操作超时之后才会被执行。
                //它还可能会延迟到空闲超时之后执行。(如果pipline中有IdleStateHandler处理器。)
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();//队列中有任务,执行一次selectNow操作返回,以免影响任务执行。
                    selectCnt = 1;
                    break;
                }

                //调用带超时时间的select操作。
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;//select在循环中的操作次数加1.

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - 有就绪的io事件,或
                    // - 外部用户唤醒, 或
                    // - 有待执行普通任务。 或
                    // - 有待执行调度任务。 
                    // 如果为true,则跳出循环,执行其它任务。
                    break;
                }
                if (Thread.interrupted()) {
                    //线程被中断因此会导致重置选中的io事件key并且会中断,因此我们也会退出循环。
                    //这很有很有可能是用户代码或者客户端包的bug,所以打印日志记录该问题。
                    //bug详情参考:https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug.....                    }
                    selectCnt = 1;
                    break;// 退出循环。
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    //超时时间已到,重新恢复select操作次数为1.此处难道不应该退出循环吗?
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    //已经设置了阀值,如果select操作次数超过了阀值,则会重建一个新的selector对象,避免jdk的bug导致进入死循环,消耗过多的cpu.
                    rebuildSelector();
                    selector = this.selector;
                    //用新的selecotr对象selectNow一次后退出循环。
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        }
    }

本来select操作的代码不会这么复杂,主要是由于JDK BUG导致select()方法并不阻塞而直接返回且返回值为0,从而出现空轮询使CPU完全耗尽。Netty解决的办法是:对select返回0的操作计数,如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector,将注册到老的selector上的channel重新注册到新的selector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。这里注意for()循环中大量使用了break,含有break的部分才是关键操作,其他部分(其实就只有一处)是为了解决JDK BUG。

processSelectedKeys

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());//使用优化处理的方法。
        } else {
            processSelectedKeysPlain(selector.selectedKeys());//普通处理key的方法。
        }
    }

我们回到了run方法中调用processSelectedKeys来处理有事件的key列表。

有优化和普通的处理方法,我们逐一来介绍。

processSelectedKeysPlain

 
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {//无有事件待处理的key列表,直接返回。
            return;
        }
        Iterator<SelectionKey> i = selectedKeys.iterator();//获得迭代器。
        for (;;) {//循环迭代每个待处理的key。
            final SelectionKey k = i.next();//获得一个有事件的key。
            final Object a = k.attachment();//获得附加对象。
            i.remove();//主动将自己从迭代器中删除。
            if (a instanceof AbstractNioChannel) {//IO事件由netty框架负责处理。
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;//IO事件由用户自定义任务处理。
                processSelectedKey(k, task);
            }
            if (!i.hasNext()) {//无下一个有事件的key,则停止循环。
                break;
            }

            if (needsToSelectAgain) {//需要再次选择。
                selectAgain();
                selectedKeys = selector.selectedKeys();
                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

这一部分代码功能就是遍历选择键,其中对选择键的处理有两种方式:Netty框架处理和用户自定义处理。这两种处理方式由register()方式决定:

// Netty框架处理 public ChannelFuture register(final Channel channel, final ChannelPromise promise);
// 用户自定义处理 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task);

接下来我们一起看看processSelectedKey代码的实现。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();//得到对应的unsafe对象。
        if (!k.isValid()) {//检查key是否合法,若不合法则进入异常处理。
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            //key不合法,直接关闭连接通道。
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();//准备就绪的操作。
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//处理客户端连接事件。
                //从感兴趣的事件中将连接事件删除。不再监听连接事件。
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //完成连接。
                unsafe.finishConnect();
            }

            //先处理 OP_WRITE 写事件,可以写入队列缓存区。
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                //调用强制刷新 forceFlush 防止缓存去无足够内存。
                ch.unsafe().forceFlush();
            }

            //读事件处理,其中readyOps == 0为对JDK Bug的处理, 防止死循环
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

可以看出对IO事件的具体处理,委托给NioUnsafe对象处理,由read()、forceFlush()、finishConnect()和close()方法处理具体的IO事件,具体的处理过程,我们将在分析NioUnsafe时讲解。


runAllTasks

该方法是在timeoutNanos时间内执行所有的普通任务。

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();//从定时任务队列中读取任务。
        Runnable task = pollTask();//拉取任务。
        if (task == null) {//无任务则可以跳出。
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;//执行任务截至时间。
        long runTasks = 0;//执行任务的个数。
        long lastExecutionTime;
        for (;;) {//循环执行任务。
            safeExecute(task);//安全执行任务,会捕获异常,避免因为执行一个任务异常导致程序中断。
            runTasks ++;
            //每隔64个任务检查一次时间,由于nanoTime操作非常耗时,因此不能每次都检查。64这个值当前是硬编码的,无法配置,可能会成为一个问题。
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            task = pollTask();//继续取下一个任务。
            if (task == null) {//队列中无任务了则跳出循环。
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        //执行完所有任务后处理,继续执行尾部队列的任务。
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;//记录上次执行时间。
        return true;
    }

NioEventLoopGroup

NioEventLoopGroup的主要代码实现是模板方法newChild(),用来创建线程池中的单个线程,代码如下:

    @Override
    protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) 
                   throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), 
            (RejectedExecutionHandler) args[2]);
    }

    public void setIoRatio(int ioRatio) {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).setIoRatio(ioRatio);
        }
    }


    public void rebuildSelectors() {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).rebuildSelector();
        }
    }

该实现代码就是创建了一个NioEventLoop对象。

此外NioEventLoopGroup还提供了setIoRatio()和rebuildSelectors()两个方法,一个用来设置I/O任务和非I/O任务的执行时间比,一个用来重建线程中的selector来规避JDK的epoll 100% CPU Bug。其实现也是依次设置各线程的状态,故不再列出。

© 著作权归作者所有

共有 人打赏支持
杨武兵

杨武兵

粉丝 255
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
Qzone 微信 Java高级——dubbo源码分析之远程通信 netty

Java高级——dubbo源码分析之远程通信 netty dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dubbo 支持以下网络通信框架: Netty(默认) Min...

Java架构师那些事
2018/08/29
0
0
netty文章收藏

1.林峰的关于netty文章 不错,遇到问题反复看了几遍总有启发,还说收藏记录一下。 http://www.infoq.com/cn/netty Netty案例集锦之多线程篇(续)作者 李林锋 发布于 2015年11月25日 9 Nett...

GREKI
2016/04/05
604
0
源码之下无秘密 ── 做最好的 Netty 源码分析教程

背景 在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学习 Netty 源码的想法. 刚开始看源码的时候, 自然是比较痛苦...

永顺
2017/11/29
0
0
Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)

目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 之二 NIO Channel 小结 Java NIO...

永顺
2017/11/29
0
0
【死磕Netty】—–服务端启动过程分析

原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! 上篇博客(【死磕Netty】----Netty的核心组件及其设计),了解了 Netty 的核心组件及其设计,但是这些都是零散的,不成...

chenssy
2017/12/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

eggjs与sequelize简单demo

参考 egg 官方文档 安装 // 依赖npm install --save egg-sequelize mysql2// ts 类型npm install --save @types/sequelize 插件,config/plugin.ts import { EggPlugin } from 'egg';......

Geeyu
51分钟前
1
0
看过上百部片子的这个人教你视频标签算法解析

本文由云+社区发表 随着内容时代的来临,多媒体信息,特别是视频信息的分析和理解需求,如图像分类、图像打标签、视频处理等等,变得越发迫切。目前图像分类已经发展了多年,在一定条件下已经...

腾讯云加社区
今天
4
0
2. 红黑树

定义:红黑树(Red-Black Tree,简称R-B Tree),它一种特殊的二叉查找树(Binary Search Tree)。 要理解红黑树,先要了解什么是二叉查找树。在上一章中,我们学习了什么是二叉树,以及二叉树...

火拳-艾斯
今天
3
0
input的button类型,点击页面跳转

一、input type=button 不做任何操作 例如: <input type="button" class="btn btn-primary" style="width: 30%" value="返回" onclick="window.location.href='/users/list'"></input> onc......

Sunki
今天
1
0
踩坑:js 小数运算出现精度问题

背景 在学习小程序商城源码时发现了这个问题,单价可能出现小数,小数之间运算结果会莫名其妙多出一大串数字,比如下面这样👇。 在此之前我是知道 js 中著名的 0.1 + 0.2 != 0.3 的问题的,...

dkvirus
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部