文档章节

Netty工具类HashedWheelTimer源码走读(二)

德胜
 德胜
发布于 2015/08/10 17:18
字数 1479
阅读 1365
收藏 4

接上一篇( http://my.oschina.net/haogrgr/blog/489320 )


6. HashedWheelTimeout源码走读.

//任务的包装类, 链表结构, 负责保存deadline, 轮数, 等
//继承MpscLinkedQueueNode, 是因为timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理(并发优化)
private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout>
        implements Timeout {

    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;

    static {
        AtomicIntegerFieldUpdater<HashedWheelTimeout> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
        }
        STATE_UPDATER = updater;
    }

    private final HashedWheelTimer timer; //timer引用
    private final TimerTask task; //要执行的任务引用
    private final long deadline; //Timer启动时间 - 任务执行时间(任务加入时间+任务延迟时间)

    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int state = ST_INIT;

    //离任务执行还要等待的轮数, 当任务加入到wheel中时计算该值, 并在Worker中, 每过一轮, 该值减一.
    long remainingRounds;

    //双链表, 因为只有Worker这一个线程访问, 所以不需要synchronization / volatile.
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    //HashedWheelTimeout 所在的 wheel
    HashedWheelBucket bucket;

    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;
    }

    @Override
    public Timer timer() {
        return timer;
    }

    @Override
    public TimerTask task() {
        return task;
    }

    @Override
    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        //这里只修改状态从ST_INIT到ST_CANCELLED
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        }

        //如果状态修改成功, 则表示第一次调用cancel方法, 将HashedWheelTimeout从bucked中移除的操作封装,
        //加入到cancelled队列, 等待下一次tick再移除, 跟踪下了源码历史发现之所以这么做, 是为了对GC友好, 以前取消任务要等到下一轮才会被处理,
        //于是, 改成将cancel的任务放在timeous队列里, 然后统一处理, timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理,
        //然而, 后面又发现有锁的问题, 因为timeous这个队列可能被多个线程操作(HashedWheelTimer.newTimeout()), 开始是加锁的, 
        //于是, 将cancel任务另外存一个队列, 这样, 就不需要使用锁了, 具体见:
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        timer.cancelledTimeouts.add(new Runnable() {
            @Override
            public void run() {
                HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
                if (bucket != null) {
                    bucket.remove(HashedWheelTimeout.this);
                }
            }
        });
        return true;
    }

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);
    }

    public int state() {
        return state;
    }

    @Override
    public boolean isCancelled() {
        return state() == ST_CANCELLED;
    }

    @Override
    public boolean isExpired() {
        return state() == ST_EXPIRED;
    }

    @Override
    public HashedWheelTimeout value() {
        return this;
    }

    //到期, 执行任务
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }
}


    果然是大牛, 各种优化, 看了下源码的提交记录, 截取几段:

    1) https://github.com/netty/netty/commit/1f68479e3cd94deb3172edd3c01aa74f35032b9b   (以前wheel用的HashSet, 改成了数组)

Motivation:
At the moment there are two issues with HashedWheelTimer:
* the memory footprint of it is pretty heavy (250kb fon an empty instance)
* the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen.

Modification:
Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a MPSC queue to handover the timeouts.  So volatile writes are reduced to a minimum and also the memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance.

Result:
Lower memory-footprint and better performance

   

    2) https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1

Motivation:
At the moment the HashedWheelTimer will only remove the cancelled Timeouts once the HashedWheelBucket is processed again. Until this the instance will not be able to be GC'ed as there are still strong referenced to it even if the user not reference it by himself/herself. This can cause to waste a lot of memory even if the Timeout was cancelled before.

Modification:
Add a new queue which holds CancelTasks that will be processed on each tick to remove cancelled Timeouts. Because all of this is done only by the WorkerThread there is no need for synchronization and only one extra object creation is needed when cancel() is executed. For addTimeout(...) no new overhead is introduced.

Result:
Less memory usage for cancelled Timeouts.


    3) https://github.com/netty/netty/commit/44ea769f537bf16b833d03db844b1f3067b3acd7

Motivation:
Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE.

Modification:
Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC'ed.

Result:
No more NPE


    回到主题, 代码并不复杂, 开始看的时候, 发现继承了MpscLinkedQueueNode, 但是又没有地方用到, 后面看了下, 发现MpscLinkedQueue对其有特殊处理.

    可以看到HashedWheelTimeout就是对Timeout任务的包装, 链表结构方便加入wheel, 记录deadline, remainingRounds, state等信息, 



7. HashedWheelBucket 源码走读. 

//用来存放HashedWheelTimeout, 结构有点像linked-list, 方便移除操作.
private static final class HashedWheelBucket {

    //链表结构
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    //添加HashedWheelTimeout, 链表操作, 不多说~~~
    public void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    //当tick到该wheel的时候, Worker会调用这个方法, 根据deadline来判断任务是否过期(remainingRounds是否为0), 
    //任务到期就执行, 没到期, 就timeout.remainingRounds--, 因为走到这里, 表示改wheel里的任务又过了一轮了.
    public void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        //遍历链表
        while (timeout != null) {
            boolean remove = false;
            if (timeout.remainingRounds <= 0) {//任务已到执行点
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
                remove = true;
            } else if (timeout.isCancelled()) {
                remove = true;
            } else {//没到期, 剩余轮数减一
                timeout.remainingRounds --;
            }

            //先保存next, 因为移除后, 再获取timeout.next会为空.
            HashedWheelTimeout next = timeout.next;
            if (remove) {//当以到期, 或者被取消, 就将timeou从链表中移除
                remove(timeout);
            }
            timeout = next;
        }
    }

    //链表移除, 不多说
    public void remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        if (timeout == head) {
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            tail = timeout.prev;
        }
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
    }

    //Clear this bucket and return all not expired / cancelled {@link Timeout}s.
    public void clearTimeouts(Set<Timeout> set) {
        for (;;) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    //链表的poll
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head =  null;
        } else {
            this.head = next;
            next.prev = null;
        }

        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}


    可以看到, 代码也不复杂, 主要是提供一个类似于LinkedList的容器, 用来存放HashedWheelTimeout, 并提供expireTimeouts(long deadline) 方法来处理该wheel中的任务. 具体处理看注释.


    字数限制... 接第二篇..., 还剩最后的Worker的代码.


© 著作权归作者所有

德胜
粉丝 59
博文 31
码字总数 41512
作品 0
长沙
私信 提问
Netty工具类HashedWheelTimer源码走读(一)

简单介绍. A Timer optimized for approximated I/O timeout scheduling. 关于Timer的介绍可以看看这篇文章, 写得不错 : http://novoland.github.io/%E5%B9%B6%E5%8F%91/2014/07/26/%E5%AE%9......

德胜
2015/08/07
7.8K
0
Netty工具类HashedWheelTimer源码走读(三)

接上一篇( http://my.oschina.net/haogrgr/blog/490266 ) 8. Worker代码走读. //主要负责累加tick, 执行到期任务等.private final class Worker implements Runnable { private final Set<T......

德胜
2015/08/10
1K
0
Netty定时任务HashedWheelTimer

HashedWheelTimer 主要用来高效处理大量定时任务, 且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是, 内存占用相对较高 构造函数

休扬子
2016/10/15
372
0
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
743
1
netty的reconnect方式之一

使用timer timer HashedWheelTimerFactory ReconnectHandler docs TcpTransport

go4it
2016/10/08
52
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周三乱弹 —— 调查人员问狗 那你在做什么啊?

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 小小编辑推荐:《Let It Be》- John Denver 《Let It Be》- John Denver 手机党少年们想听歌,请使劲儿戳(这里) @FalconChen :每天看一遍,...

小小编辑
今天
6
0
高效程序员的45个习惯总结版-文末脑图

1 做事 一个重大的错误应该被当做一次学习而不是指责他人的机会,团队成员一起工作,应该互相帮助,而不是互相指责 2 欲速则不达 不要为了修复问题而去修复,要投入时间和精力保持代码整洁 ...

阿提说说
今天
18
0
带南海九段线分位数地图可视化(R语言版)

今天带来一篇承诺虾神的可视化博客。内容是使用R语言进行带南海九段线分位数地图可视化。虾神的原博文地址如下(Python版)。 Python实现带南海九段线分位数地图完整可视化版本(附代码及数据...

胖胖雕
今天
12
0
Nginx 的进程结构,你明白吗?

Nginx 进程结构 这篇文章我们来看下 Nginx 的进程结构,Nginx 其实有两种进程结构: 单进程结构 多进程结构 单进程结构实际上不适用于生产环境,只适合我们做开发调试使用。因为在生产环境中...

武培轩
今天
20
0
蓝鲸平台部署

环境 系统:Centos7 依赖包:java8 主机: 10.0.1.150 域名:paas.ops.net;cmdb.ops.net;job.ops.net 生成SSH key ssh-keygen -t rsa -P '' 生成证书 https://bk.tencent.com/download_ssl/......

以谁为师
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部