文档章节

谈谈Java8的高并发计数组件

AbeJeffrey
 AbeJeffrey
发布于 2018/03/14 19:43
字数 3182
阅读 816
收藏 9

Striped64

为了深入理解LongAdder,我们需要先学习Striped64,了解它的设计理念和实现机制。

Striped64是一个扩展自Number的包级私有类,主要用于在64位值上支持动态striping的类的通用表示和机制。

设计理念

首先,我们明确什么是striping(条带化)?

大多数磁盘系统都对访问次数(每秒的 I/O 操作,IOPS)和数据传输率(每秒传输的数据量,TPS)有限制。当达到这些限制时,后续需要访问磁盘的进程就需要等待,这就是所谓的磁盘冲突。当多个进程同时访问一个磁盘时,可能会出现磁盘冲突。因此,避免磁盘冲突是优化 I/O 性能的一个重要目标。

条带(strip)是把连续的数据分割成相同大小的数据块,把每段数据分别写入到阵列中的不同磁盘上的方法。使用条带化技术使得多个进程同时访问数据的多个不同部分而不会造成磁盘冲突,而且在需要对这种数据进行顺序访问的时候可以获得最大程度上的 I/O 并行能力,从而获得非常好的性能。

Striped64正是利用条带化的设计理念,将逻辑上连续的数据分割为64bit的片段,并结合缓存行填充,减少高并发下CAS操作的竞争,从而提高并发更新的吞吐量。

详细介绍

Striped64中维护了一个延迟初始的原子更新变量表(Cell表),外加一个额外的“base”字段。其中表的大小是2的幂,并使用每个线程的哈希码作为掩码进行索引。

Striped64的表条目使用Cell类表示。为了减少缓存争用,Cell被设计为一个填充的AtomicLong变体(通过@sun.misc.Contended)。填充对于大多数Atomics来说是多余的,因为它们通常是不规则地散布在内存中,相互之间不会干扰太多。但是,驻留在数组中的原子对象往往会彼此相邻放置,因此在没有此预防措施的情况下,通常会共享缓存行(对性能产生巨大的负面影响)。

部分地,由于Cell相对较大,只有在需要时才会创建。当没有竞争时,所有的更新都作用到base字段。当第一次竞争(更新base的CAS失败)时,表被初始化为大小2。若发生进一步争用,表大小加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在需要之前保持为空。

Striped64采用一个单独的自旋锁(“cellsBusy”)用于初始化和调整表的大小,以及使用新的Cell填充槽。不需要阻塞锁,当锁不可用时,线程尝试其他槽(或 base)。在这些重试期间,会增加竞争和降低局部性,但这仍然好于其他替代方案。

Striped64使用ThreadLocalRandom维护线程的探针字段作为每个线程的哈希码。并保持让它们不会初始化为0,直到它们在槽0处发生竞争。这时候会被初始化为通常不会相互冲突的值。执行更新操作时,使用失败的CAS指示争用和/或表冲突。发生冲突时,如果表的大小小于容量,则表大小加倍,除非其他线程持有自旋锁。如果已哈希的槽是空的,并且自旋锁可用,则创建新的Cell。否则,如果槽存在,采用CAS重试。重试通过“双重哈希”进行,使用一个次要的哈希算法(Marsaglia XorShift)尝试查找空闲槽。

Striped64中表的大小是有上限的。因为当线程数多于CPU核数时,假设每个线程都绑定到一个CPU上,会有一个完美的哈希函数将线程映射到槽上,以消除冲突。当表大小达到容量时,会通过随机改变碰撞线程的哈希码来搜索这个映射。因为搜索是随机的,并且碰撞只能通过CAS失败而知道,所以收敛速度可能会很慢,并且因为线程通常不会一直绑定到CPU上,所以可能根本不会发生。但是,尽管存在这些限制,在这些情况下观察到的争用率通常较低。

当曾经散列到Cell的线程终止时,Cell 可能变得空闲。此外,表加倍后导致没有线程哈希到扩展的Cell也会使Cell变得空闲。我们不尝试去检测或移除这些Cell,假设对于长期运行的实例,观察到的竞争水平将重现,所以Cell将最终被再次需要。对于短期存活的实例,这没关系。

源码解读

成员简介

/** CPU数量,用于限制表大小 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
 * Cell表,非空时,大小为2的幂
 */
transient volatile Cell[] cells;

/**
 * base值,主要在没有竞争时用,也可作为表初始化竞争时的一个降级。通过CAS操作更新
 */
transient volatile long base;

/**
 * 在调整大小和/或创建Cell时使用的自旋锁(通过CAS锁定)。1表示锁定,0表示空闲
 */
transient volatile int cellsBusy;

Cell类

我们知道Cell类是一个填充的AtomicLong变体。它通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。Cell本质就是一个填充的volatile变量,然后使用CAS进行更新,从而保证线程安全性。

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

累加机制

Striped64提供了两种累加API:longAccumulate和doubleAccumulate,两者的实现思路是一致的,只不过前者用于long值的累加,后者用于double值的累加。整个累加过程涉及初始化,调整大小,创建新Cell,和/或争用的更新。下面我们以longAccumulate为例说明累加机制的实现原理,先看流程图:

                                                     (longAccumulate流程图)

longAccumulate源码详细分析如下:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // 强制初始化探针值
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;  //如果最后一个槽非空,则为true,说明发生碰撞,可用于控制扩容
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {//cells表非空
            if ((a = as[(n - 1) & h]) == null) {//当前线程映射的槽为空
                if (cellsBusy == 0) {       // 自旋锁未被占用,尝试添加新的Cell
                    Cell r = new Cell(x);   // 乐观创建
                    if (cellsBusy == 0 && casCellsBusy()) {//自旋锁未被占用,尝试获取锁
                        boolean created = false;
                        try {    // 获取锁之后再次检查cells表,确保当前线程对应的槽为空,防止其他线程已经添加新的Cell
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {//释放锁
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // 槽非空
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // 已知CAS失败
                wasUncontended = true;      // 通过rehash后重试
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))//CAS更新cell的值
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // cells表大小达到上限或cells表发生改变,通过rehash后重试
            else if (!collide)
                collide = true;//设为true,并通过rehash后重试,若还是到这里,说明竞争较大,条件将不满足,会尝试扩容
            else if (cellsBusy == 0 && casCellsBusy()) {//说明存在竞争,尝试获取自旋锁之后扩容
                try {
                    if (cells == as) {      // 表未发生改变则进行扩容
                        Cell[] rs = new Cell[n << 1];//扩容为原来的2倍
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;  
                continue;                   // 扩容后重试
            }
            h = advanceProbe(h);   //rehash
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//表未空,尝试获取锁之后初始化表
            boolean init = false;
            try {                           
                if (cells == as) {
                    Cell[] rs = new Cell[2];  //初始化表大小为2
                    rs[h & 1] = new Cell(x);  //选择槽完成赋值
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;         // 获取锁失败,说明其他线程在初始化表,扩容,或创建Cell,降级使用CAS更新base
    }
}

Striped64小结:Striped64通过维护一个原子更新Cell表和一个base字段,并使用每个线程的探针字段作为哈希码映射到表的指定Cell。当竞争激烈时,将多线程的更新分散到不同Cell进行,有效降低了高并发下CAS更新的竞争,从而最大限度地提高了Striped64的吞吐量。Striped64为实现高吞吐量的并发计数组件奠定了基础,其中LongAdder就是基于Striped64实现,此外Java8中ConcurrentHashMap实现的并发计数功能也是基于Striped64的设计理念,还有hystrix、guava等实现的并发计数组件也离不开Striped64。

LongAdder

首先看看LongAdder在Java doc中的描述:

LongAdder中维护了一个或多个变量来组成一个初始为0的long类型的“和“。当多线程竞争更新值时(特指add方法),为了减少竞争,可能会动态增长这组变量的数量。sum方法(等效于longValue方法)返回当前这组变量的“和”值。

当多线程更新应用于收集统计数据(如计数),而非用于细粒度同步控制时,LongAdder类比AtomicLong更好用。在较低的更新竞争场景下,两者性能差不多。但是,在高度更新竞争场景下,LongAdder具有更高的吞吐量,但以较高的空间消耗为代价。

LongAdder扩展自Striped64,所以LongAdder的核心功能基于Striped64。Striped64是抽象的,且是包级私有,外部无法直接使用Striped64,LongAdder则是对外直接可用的组件。下面我们结合源码分析在实际应用中如何使用LongAdder。

计数

通常计数器加1可调用LongAdder的increment方法,减1则调用decrement方法,实现:

public void increment() {
    add(1L);
}
public void decrement() {
    add(-1L);
}

从加1和减1的操作可知,最终都是调用add来完成。继续看add方法的源码:

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

add支持添加自定义指定的值。整个add过程:

  • 如果Cells表为空,尝试用CAS更新base字段,成功则退出;
  • 如果Cells表为空,CAS更新base字段失败,出现竞争,uncontended为true,调用longAccumulate;
  • 如果Cells表非空,但当前线程映射的槽为空,uncontended为true,调用longAccumulate;
  • 如果Cells表非空,且前线程映射的槽非空,CAS更新Cell的值,成功则返回,否则,uncontended设为false,调用longAccumulate。

从add的实现可知,当不存在竞争或竞争比较低时,Cells表为空,LongAdder总是通过CAS更新base字段完成计数,与AtomicLong效率不相上下。

获取计数值

LongAdder像普通Number类型一样,支持调用longValue,intValue,floatValue等方法返回指定类型的数据,只不过这些方法都调用了sum方法获取当前计数和,并进行了类型转换。sum源码如下:

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

实现一目了然,将base和cells表中各Cell的value加起来即为最终计数和。

总结:LongAdder就是基于Striped64实现,用于并发计数时,若不存在竞争或竞争比较低时,LongAdder具有和AtomicLong差不多的效率。但是,高并发环境下,竞争比较严重时,LongAdder的cells表发挥作用,将并发更新分散到不同Cell进行,有效降低了CAS更新的竞争,从而极大提高了LongAdder的并发计数能力。因此,高并发场景下,要实现吞吐量更高的计数器,推荐使用LongAdder。

© 著作权归作者所有

AbeJeffrey
粉丝 49
博文 43
码字总数 116095
作品 0
杭州
高级程序员
私信 提问
加载中

评论(0)

Java8中的简易并发

我们在《实战Java高并发程序设计》一书中,专门介绍过java 8中对并发的改进。这里我通过转载一篇博客来说简单介绍下。 有人曾经说过(很抱歉,我们找不到原句了): 初级程序员认为并发很难。...

吴小编
2016/04/12
188
0
dubbo中的那些“坑"(3)-netty4-rpc网络接口中的高并发的bug

在几个月前改造dubbo时,netty4已经稳定很久了,一时手痒,按照netty3-rpc的源码克隆了一套netty4,在修正了大量的包、类型不同之后,基本保持了netty3的风格,并发量小或者数据包很小时,一...

阿阮
2014/12/02
9.5K
10
JUC包中的分而治之策略-为提高性能而生

一、前言 本次分享我们来共同探讨JUC包中一些有意思的类,包含AtomicLong & LongAdder,ThreadLocalRandom原理。 二、AtomicLong & LongAdder 2.1 AtomicLong 类 AtomicLong是JUC包提供的原子...

加多
2019/01/05
0
0
微博每日数十亿级业务下的计数器如何扩展Redis?

大家新年好!第一个工作日,为大家奉上一篇投稿,来自《深入分布式缓存》这本书。曾记得当年,现任某海外电商CTO对我说,互联网架构有两大神器,一个是缓存,一个是MQ,整明白就能搞定高并发...

IT民工闲话
2018/01/02
0
0
Dubbo源码之客户端并发控制——ActiveLimitFilter

上篇解释了Dubbo源码中降级及容错处理 Dubbo服务调用——Cluster组件(服务降级,容错) 这篇文章主要是关于Dubbo源码中的限流组件,Dubbo限流除了限流(并发限制)的入口ThreadPool 之外,还有...

BakerZhu
2018/08/25
881
0

没有更多内容

加载失败,请刷新页面

加载更多

超时解决方案

背景 商城系统订单超时自动取消 车辆挂号未在某一时刻内靠台,车辆就挂起 ....等等当超过时间某个阈值后,对系统内某些数据进行校验处理 简易实现方案 延时消息方案 将系统需要承担的轮询压力...

无极之岚
8分钟前
37
0
kafka是什么?storm与kafka的区别?

kafka是消息队列,类似于RabitMQ,作为中间组件,主要解决异步,削峰,提高系统性能。 storm与kafka的区别? storm用于流数据的实时处理,在获取流数据之前,需要加入kafka消息队列来提高性能...

七宝1
30分钟前
57
0
第二章 构建业务中台的基础----共享服务体系

1.ESB:企业服务总线。 2.SOA理念最核心的价值:松耦合的服务带来业务的复用,通过服务的编排助力业务的快速响应和创新。 3.服务不需要“业务稳定”,而需要不停的滋养,只有在滋养中才能从最...

zxx901221
49分钟前
41
0
Spring Boot 2.x基础教程:使用 ECharts 绘制各种华丽的数据图表

上一节我们介绍了如何在Spring Boot中使用模板引擎Thymeleaf开发Web应用的基础。接下来,我们介绍一下后端开发经常会遇到的一个场景:可视化图表。 通常,这类需求在客户端应用中不太会用到,...

程序猿DD
昨天
53
0
SpringBoot实战:SpringBoot之自定义配置(一)

SpringBoot会默认加载application.yml和application.properties文件,但是有时候我们会对一些配置进行分类管理,如把数据库等配置进行单独配置,那这时候要怎么办呢,SpringBoot作为现在最流...

枫叶_林
昨天
60
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部