文档章节

六个面试题层层剖析——LongAddr原子类原理探究

须臾之余
 须臾之余
发布于 07/17 18:18
字数 2613
阅读 78
收藏 0

并发编程面试题

(1)LongAddr的结构是怎样的?

(2)当前线程应该访问Cell数组里面的哪一个Cell元素?

(3)如何初始化Cell数组?

(4)Cell数组如何扩容?

(5)线程访问分配的Cell元素有冲突后如何处理?

(6)如何保证线程操作被分配的Cell元素的原子性?

(1)LongAddr的结构是怎样的?

由LongAddr的类图可知:LongAddr继承自Striped64类,在Striped64内部维护了三个变量。

LongAddr的真实值=base的值+Cell数组所有Cell元素的value值。

base是个基础值,默认为0,cellsBusy用来实现自旋锁,状态只要0和1。

(6)如何保证线程操作被分配的Cell元素的原子性?

当创建Cell元素,扩容Cell数组或者是初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。

下面看下Cell的构造

/**
 * Padded variant of AtomicLong supporting only raw accesses plus CAS.
 *
 * JVM intrinsics note: It would be possible to use a release-only
 * form of CAS here, if it were provided.
 */
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

可以看到,Cell的构造很简单,其内部维护一个被声明为Volatile的变量,保证了内存的可见性。

另外使用CAS算法操作,保证了当前线程更新时被分配的Cell元素中的value值的原子性。

另外,Cell类使用 @sun.misc.Contended修饰是为了避免伪共享。

伪共享解释:数组的元素是Cell类,可以看到Cell类用Contended注解修饰,这里主要是解决false sharing(伪共享的问题),不过个人认为伪共享翻译的不是很好,或者应该是错误的共享,比如两个volatile变量被分配到了同一个缓存行,但是这两个的更新在高并发下会竞争,比如线程A去更新变量a,线程B去更新变量b,但是这两个变量被分配到了同一个缓存行,因此会造成每个线程都去争抢缓存行的所有权,例如A获取了所有权然后执行更新这时由于volatile的语义会造成其刷新到主存,但是由于变量b也被缓存到同一个缓存行,因此就会造成cache miss,这样就会造成极大的性能损失,因此有一些类库的作者,例如JUC下面的、Disruptor等都利用了插入dummy 变量的方式,使得缓存行被其独占,比如下面这种代码:

static final class Cell {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile long value;
        volatile long q0, q1, q2, q3, q4, q5, q6;
        Cell(long x) { value = x; }

        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
 }

但是这种方式毕竟不通用,例如32、64位操作系统的缓存行大小不一样,因此JAVA8中就增加了一个注@sun.misc.Contended解用于解决这个问题,由JVM去插入这些变量,具体可以参考openjdk.java.net/jeps/142 ,但是通常来说对象是不规则的分配到内存中的,但是数组由于是连续的内存,因此可能会共享缓存行,因此这里加一个Contended注解以防cells数组发生伪共享的情况。

(2)当前线程应该访问Cell数组里面的哪一个Cell元素?

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) { //——(1)
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 || //——(2)
            (a = as[getProbe() & m]) == null || //——(3)
            !(uncontended = a.cas(v = a.value, v + x))) //——(4)
            longAccumulate(x, null, uncontended); //——(5)
    }
}

代码(1):判断Cell是否为null,如果为null,则当前在基础变量base上进行累加,类似AtomicLong的操作。

代码(2):如果Cells不为null或者线程执行代码(1)的CAS操作失败了,则会去执行代码(2).

代码(2)(3):决定了当前线程应该访问cells数组里面的哪一个元素。

代码(4):如果当前线程映射的元素存在则执行代码(4),使用CAS操作去更新分配的Cell元素的value值。

代码(5):如果当前线程映射的元素不存在或者存在但是CAS操作失败则执行代码(5)。

总结:当前线程应该访问那个Cell元素是通过getProbe() & m进行计算的。

(3)如何初始化Cell数组?

/**
 * Handles cases of updates involving initialization, resizing,
 * creating new Cells, and/or contention. See above for
 * explanation. This method suffers the usual non-modularity
 * problems of optimistic retry code, relying on rechecked sets of
 * reads.
 *
 * @param x the value
 * @param fn the update function, or null for add (this convention
 * avoids the need for an extra field or function in LongAdder).
 * @param wasUncontended false if CAS failed before call
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
  ....省略无关代码....

//*******************************初始化Cell数组******************************************//
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    //——(1)
                    Cell[] rs = new Cell[2];
                    //——(2)
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                //——(3)
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

代码(1):初始化Cells数组元素个数为2,

代码(2):然后使用 h&1 计算当前线程应该访问cell数组的那一个位置。

代码(3):重置了cellsBusy标记。这里没用使用CAS算法,因为cellsBusy是volatile类型的,保证了变量内存的可见性。

(4)Cell数组如何扩容?

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
       ......
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as) //——(1)
                collide = false;            // At max size or stale
            else if (!collide) //——(2)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) { //——(3)
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1]; //——(4)
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0; //——(5)
                }
                collide = false; //——(6)
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h); //——(7)
        }
       .....
}

代码(1)(2):当前cells元素个数小于当前机器的CPU个数并且当前多个线程访问cells中的同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行代码(3)的扩容操作。

代码(3)中的扩容操作也是通过CAS设置cellsBusy为1,然后才能进行扩容。

解释下cellsBusy:他是个标示,为0说明cells数组没用被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的Cell元素。

代码(4):假设CAS成功,则执行代码(4),将容量扩充为之前的两倍,并复制Cell元素到新扩容的数组。

(5)线程访问分配的Cell元素有冲突后如何处理?

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
   .......
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) { //——(1)
            if ((a = as[(n - 1) & h]) == null) { //——(2)
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
          .....
            h = advanceProbe(h); //——(3)
        }
       .....
}

代码(3)实现

/**
 * Pseudo-randomly advances and records the given probe value for the
 * given thread.
 * Duplicated from ThreadLocalRandom because of packaging restrictions.
 */
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}

代码(1)(2):当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素个数计算要访问的Cell元素下标,然后如果发现对应下标的值为null,则新增一个Cell元素到cells数组,并且将其添加到cells数组之前要竞争设置cellsBusy为1.

代码(3):对CAS失败的线程重新计算当前线程的随机值threadLocalReadomProbe,以减少下次访问cells元素时的冲突机会。

附录

java.util.concurrency.atomic.LongAdder是Java8新增的一个类,提供了原子累计值的方法。根据文档的描述其性能要优于AtomicLong,下图是一个简单的测试对比(平台:MBP):
image
这里测试时基于JDK1.8进行的,AtomicLong 从Java8开始针对x86平台进行了优化,使用XADD替换了CAS操作,我们知道JUC下面提供的原子类都是基于Unsafe类实现的,并由Unsafe来提供CAS的能力。CAS (compare-and-swap)本质上是由现代CPU在硬件级实现的原子指令,允许进行无阻塞,多线程的数据操作同时兼顾了安全性以及效率。大部分情况下,CAS都能够提供不错的性能,但是在高竞争的情况下开销可能会成倍增长

总结

本章分析了JDK8新增的LongAddr原子性操作类,该类通过cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。

另外,数组元素Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也就避免了伪共享,提高了性能。

总的来说,LongAdder从性能上来说要远远好于AtomicLong,一般情况下是可以直接替代AtomicLong使用的,Netty也通过一个接口封装了这两个类,在Java8下直接采用LongAdder,但是AtomicLong的一系列方法不仅仅可以自增,还可以获取更新后的值,如果是例如获取一个全局唯一的ID还是采用AtomicLong会方便一点。

参考书籍

Java并发编程之美

参考链接

  1. https://blogs.oracle.com/dave/atomic-fetch-and-add-vs-compare-and-swap
  2. https://en.wikipedia.org/wiki/Compare-and-swap
  3. http://ashkrit.blogspot.com/2014/02/atomicinteger-java-7-vs-java-8.html
  4. https://dzone.com/articles/adventures-atomiclong

 

© 著作权归作者所有

须臾之余
粉丝 125
博文 67
码字总数 178724
作品 0
吉安
程序员
私信 提问
Java系列文章(全)

JVM JVM系列:类装载器的体系结构 JVM系列:Class文件检验器 JVM系列:安全管理器 JVM系列:策略文件 Java垃圾回收机制 深入剖析Classloader(一)--类的主动使用与被动使用 深入剖析Classloader(二...

www19
2017/07/04
0
0
后台开发常问面试题集锦(问题搬运工,附链接)

Java基础问题 String的’+’的性能及原理 java之yield(),sleep(),wait()区别详解-备忘笔记 深入理解Java Stream流水线 抽象 & abstract关键字 Java final 修饰符知识点总结(必看篇) Java中的...

大黄有故事
2017/11/18
0
0
11月推荐给程序员们的四本书

难得在家休息一段时间,职业病的原因吧,推荐技术书的习惯一时间没能改变。借着自己的微信平台,每天向大家推荐一些靠谱的内容,希望大家能喜欢。 先跟着我来看我今天推荐的第一本书吧! C+...

生气的散人
2013/11/11
988
2
真·干货!这套深度学习教程整理走红,从理论到实践的带你系统学习 | 资源...

铜灵 发自 凹非寺 量子位 出品 | 公众号 QbitAI 寒假/春节小长假给自己充电的真·干货来了。 如果你想要的是一份从理论到实践的深度学习教程清单,如果你想系统了解各类框架、基础网络与各种...

量子位
01/27
0
0
关于本博客数据仓库方面的原创文章汇总

关于本博客数据仓库方面的原创文章汇总 收藏 关于数据仓库方面的文章汇总 我的数据仓库之路! 关于数据仓库维度处理的系列文章 1 关于数据仓库维度数据处理的方法探究系列—— 维的概述 2 关...

baoqiangwang
2018/06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

浅谈FlyWeight享元模式

一、前言 享元(FlyWeight)模式顾名思义,即是轻量级,原因就是享元,共享元素,这里的元素指的是对象。如何共享对象,那就是在检测对象产生的时候,如果产生的是同一个对象,那么直接使用已...

青衣霓裳
8分钟前
2
0
Python学习10.14:Python set集合详解

Python 中的集合,和数学中的集合概念一样,用来保存不重复的元素,即集合中的元素都是唯一的,互不相同。 从形式上看,和字典类似,Python 集合会将所有元素放在一对大括号 {} 中,相邻元素...

太空堡垒185
8分钟前
5
0
好程序员大数据教程分享Scala系列之文件以及正则表达式

好程序员大数据教程分享Scala系列之文件以及正则表达式 1 读取行 导入scala.io.Source后,即可引用Source中的方法读取文件信息。 import scala.io.Source object FileDemo extends App{ val ...

好程序员官网
8分钟前
2
0
75.nosql memcached与安装及查看状态

21.1 nosql介绍 21.2 memrcached介绍 21.3 安装memcached 21.4 查看memcachedq状态 21.1 nosql介绍 什么是NoSQL: 1.非关系型数据库就是NoSQL,关系型数据库代表MySQL 也是一种数据库,来存储...

oschina130111
10分钟前
3
0
玩转阿里云 Terraform(二):Terraform 的几个关键概念

上一篇《玩转阿里云Terraform(一):Terraform 是什么》介绍了 Terraform 的基本定义和特点之后,本文将着重介绍几个Terraform中的关键概念。 Terraform 关键概念 在使用Terraform的过程中,通...

阿里云官方博客
10分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部