文档章节

Netty工具类HashedWheelTimer源码走读(一)

德胜
 德胜
发布于 2015/08/07 17:52
字数 1648
阅读 6299
收藏 25

1. 简单介绍.

    A Timer optimized for approximated I/O timeout scheduling. 

    关于Timer的介绍可以看看这篇文章, 写得不错 :  http://novoland.github.io/%E5%B9%B6%E5%8F%91/2014/07/26/%E5%AE%9A%E6%97%B6%E5%99%A8%EF%BC%88Timer%EF%BC%89%E7%9A%84%E5%AE%9E%E7%8E%B0.html

    可以看到, HashedWheelTimer 主要用来高效处理大量定时任务,  且任务对时间精度要求相对不高,  比如链接超时管理等场景, 缺点是,  内存占用相对较高.



2. 简单例子.

    1) 引入最新的Netty 5依赖 (不一定需要Netty5以前的版本里HashedWheelTimer就已存在)

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>


    2) 例子代码(LocalTime是Java8的时间类)

import io.netty.util.HashedWheelTimer;
import java.time.LocalTime;
import java.util.concurrent.TimeUnit;

public class Temp {

    public static void main(String[] args) throws Exception {
        //创建Timer, 精度为100毫秒, 
        HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 16);

        System.out.println(LocalTime.now());

        timer.newTimeout((timeout) -> {
            System.out.println(LocalTime.now());
            System.out.println(timeout);
        }, 5, TimeUnit.SECONDS);

        //阻塞main线程
        System.in.read();
    }

}

    

    可以看到输出:

14:55:13.735
14:55:18.845
HashedWheelTimer$HashedWheelTimeout(deadline: 101009856 ns ago, task: com.haogrgr.test.main.Temp$$Lambda$4/708890004@1b797119)



3. 原理简介.

    1) 原理如图所示.

    可以看到, 就像一个时钟一样, 那么如果写代码来实现一个时钟的话, 大概类似于这样:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.math.IntRange;

public class Temp {

    public static void main(String[] args) throws Exception {
        Clock clock = new Clock().start();
        for (int i = 0; i < 5; i++) {
            clock.echo();
            Thread.sleep(500);
        }
    }
}

final class Clock {

    static ScheduledExecutorService updater = Executors.newSingleThreadScheduledExecutor();

    private int tick = 0;
    private int[] wheel = new IntRange(1, 60).toArray();

    Clock start() { //1s累加一次tick, 当到60s时归零
        updater.scheduleAtFixedRate(() -> tick = ++tick % wheel.length, 0, 1, TimeUnit.SECONDS);
        return this;
    }

    void echo() {
        System.out.println("当前时钟 : " + wheel[tick]);
    }
}

    运行输出:

当前时钟 : 1
当前时钟 : 2
当前时钟 : 2
当前时钟 : 3
当前时钟 : 3


    回到主题, 来看看HashedWheelTimer的构造函数参数:

HashedWheelTimer(
    ThreadFactory threadFactory, //类似于Clock中的updater, 负责创建Worker线程.
    long tickDuration,           //时间刻度之间的时长(默认100ms), 通俗的说, 就是多久tick++一次.
    TimeUnit unit,               //tickDuration的单位.
    int ticksPerWheel            //类似于Clock中的wheel的长度(默认512).
):


    除了构造函数参数, 还有一个比较重要的概念, 轮(Round) :  一轮的时长为 tickDuration * ticksPerWheel, 也就是转一圈的时长.

    其中Worker线程为HashedWheelTimer的核心, 主要负责每过tickDuration时间就累加一次tick. 同时, 也负责执行到期的timeout任务, 同时, 也负责添加timeou任务到指定的wheel中.

    当添加Timeout任务的时候, 会根据设置的时间, 来计算出需要等待的时间长度, 根据时间长度, 进而算出要经过多少次tick, 然后根据tick的次数来算出经过多少轮, 最终得出task在wheel中的位置.


    例如, 如果任务设置为在100s后执行. 如果按照默认的HashedWheelTimer配置(tickDuration为100ms, wheel长为512)则:

任务需要经过的tick数为: (100 * 1000) / 100 = 1000次 (等待时长 / tickDuration)
任务需要经过的轮数为  : 1000次 / 512次/轮 = 1轮     (tick总次数 / ticksPerWheel)
任务存放的wheel索引为 : 1000 - 512 = 488            (走完n轮时间后, 还要多少个tick)

所以这里任务需要经过一轮后, 还要等待488次tick, 才会执行, 进而任务存放的wheel位置也就是488.

    到这里, 大概原理已经介绍完了, 接下来看源码吧.



4. 主要成员.

    1) HashedWheelTimer, 对外的类, 主要负责启动Worker线程, 添加任务等. 

    2) Worker, 内部负责添加任务, 累加tick, 执行任务等. 

    3) HashedWheelTimeout, 任务的包装类, 链表结构, 负责保存deadline, 轮数, 等.

    4) HashedWheelBucket, wheel数组元素, 负责存放HashedWheelTimeout链表.



5. HashedWheelTimer源码走读.

   下面是HashedWheelTimer的代码, 去掉了一些非关键代码.

public class HashedWheelTimer implements Timer {

    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER;
    static {
        AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = PlatformDependent
                .newAtomicIntegerFieldUpdater(HashedWheelTimer.class, "workerState");
        if (workerStateUpdater == null) {
            workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
        }
        WORKER_STATE_UPDATER = workerStateUpdater;
    }

    private final Worker worker = new Worker();
    private final Thread workerThread;

    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;
    @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down

    private final long tickDuration;
    private final HashedWheelBucket[] wheel;
    private final int mask;
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    private final Queue<Runnable> cancelledTimeouts = PlatformDependent.newMpscQueue();

    private volatile long startTime;

    //创建Timer
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
        //校验参数, 代码略

        //创建wheel数组, 和HashMap的entry数组长度类似, 为2的次方.
        wheel = createWheel(ticksPerWheel);

        //用于计算任务存放wheel的索引
        //因为wheel长度为2的次方, 则, 如果长度为16(10000), mask就为15(1111)
        //那么, 通过    n & mask 就可以实现 类似于 n % mask, 而 & 更高效........
        mask = wheel.length - 1;

        //tickDuration 不能大于 Long.MAX_VALUE / wheel.length, 也就是一轮的时间不能大于Long.MAX_VALUE 纳秒
        this.tickDuration = unit.toNanos(tickDuration);

        //创建worker线程
        workerThread = threadFactory.newThread(worker);
    }

    //创建wheel数组
    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        //参数校验, 略

        //2的次方
        ticksPerWheel = 1;
        while (ticksPerWheel < ticksPerWheel) {
            ticksPerWheel <<= 1;
        }

        //初始化
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

    //启动Timer, 不需要显示调用, 调用newTimeout时, 会自动调用该方法
    public void start() {
        //初始为WORKER_STATE_INIT, cas修改为WORKER_STATE_STARTED, 并启动worker线程
        switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
        }

        //等待worker启动, 并初始化startTime完成
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

    //停止Timer
    public Set<Timeout> stop() {
        //worker线程不能调用stop方法, 也就是我们添加的Task中不能调用stop方法.
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
                    + TimerTask.class.getSimpleName());
        }

        //cas修改状态为shutdown, 如果修改失败, 则当前状态只可能是WORKER_STATE_INIT和WORKER_STATE_SHUTDOWN
        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);//总是设置为WORKER_STATE_SHUTDOWN
            return Collections.emptySet();//状态为0和2时, 是没有遗留任务的.
        }

        //中断worker线程, worker线程中会轮询Timer状态的.
        boolean interrupted = false;
        while (workerThread.isAlive()) {
            workerThread.interrupt();
            try {
                workerThread.join(100);
            } catch (InterruptedException ignored) {
                interrupted = true;
            }
        }

        //恢复中断标志
        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        //返回未处理的任务
        return worker.unprocessedTimeouts();
    }

    //添加定时任务, delay为延迟时间
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        start();//未启动, 则启动

        //任务先添加到timeouts队列中, 等待下一个tick时, 再添加到对应的wheel中去.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
}


    注释的比较详细了, 代码也比较简单, 就不多说了, 看注释吧.

    添加任务时, 并不是直接将人物添加到wheel中, 而是先放入队列, 再等待Worker线程在下一次tick时, 将人物放入wheel中.

    AtomicIntegerFieldUpdater是JUC的类, Netty会判断, 当存在Unsafe时, 会使用Netty自己利用Unsafe实现的UnsafeAtomicIntegerFieldUpdater.


    字数限制... 接第二篇...

© 著作权归作者所有

共有 人打赏支持
德胜
粉丝 59
博文 31
码字总数 41512
作品 0
长沙
私信 提问
Netty工具类HashedWheelTimer源码走读(二)

接上一篇( http://my.oschina.net/haogrgr/blog/489320 ) 6. HashedWheelTimeout源码走读. //任务的包装类, 链表结构, 负责保存deadline, 轮数, 等//继承MpscLinkedQueueNode, 是因为timeous...

德胜
2015/08/10
0
0
Netty工具类HashedWheelTimer源码走读(三)

接上一篇( http://my.oschina.net/haogrgr/blog/490266 ) 8. Worker代码走读. //主要负责累加tick, 执行到期任务等.private final class Worker implements Runnable { private final Set<T......

德胜
2015/08/10
0
0
Netty定时任务HashedWheelTimer

HashedWheelTimer 主要用来高效处理大量定时任务, 且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是, 内存占用相对较高 构造函数

休扬子
2016/10/15
88
0
apache kafka技术分享系列(目录索引)

目录索引: Kafka使用场景 1.为何使用消息系统 2.我们为何需要搭建ApacheKafka分布式系统 3.消息队列中点对点与发布订阅区别 kafka开发与管理: 1)apachekafka消息服务 2)kafak安装与使用 ...

dannyhe
2015/09/06
453
1
netty的reconnect方式之一

使用timer timer HashedWheelTimerFactory ReconnectHandler docs TcpTransport

go4it
2016/10/08
21
0

没有更多内容

加载失败,请刷新页面

加载更多

Linux iptables之mangle表使用案例

mangle表的用途 mangle表的主要功能是根据规则修改数据包的一些标志位,以便其他规则或程序可以利用这种标志对数据包进行过滤或策略路由。 mangel表使用示例 示例1-策略路由1 内网的客户机通...

月下狼
今天
3
0
OSChina 周日乱弹 —— 兼职我想去学学布偶戏

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @clouddyy : 《火炎 - 女王蜂》 《火炎 - 女王蜂》 手机党少年们想听歌,请使劲儿戳(这里) @小鱼丁 :还在睡觉突然接到一个小哥哥电话“x...

小小编辑
今天
121
7
租房软件隐私保护如同虚设

近日,苏州市民赵先生向江苏新闻广播新闻热线025-84658888反映,他在“安居客”手机应用软件上浏览二手房信息,并且使用该软件自动生成的虚拟号码向当地一家中介公司进行咨询。可电话刚挂不久...

linux-tao
今天
4
0
分布式项目(五)iot-pgsql

书接上回,在Mapping server中,我们已经把数据都整理好了,现在利用postgresql存储历史数据。 iot-pgsql 构建iot-pgsql模块,这里我们写数据库为了性能考虑不在使用mybatis,换成spring jd...

lelinked
今天
6
0
一文分析java基础面试题中易出错考点

前言 这篇文章主要针对的是笔试题中出现的通过查看代码执行结果选择正确答案题材。 正式进入题目内容: 1、(单选题)下面代码的输出结果是什么? public class Base { private Strin...

一看就喷亏的小猿
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部