ConcurrentHashMap源码探究 (JDK 1.8)

2019/04/10 10:10
阅读数 35

很早就知道在多线程环境中,HashMap不安全,应该使用ConcurrentHashMap等并发安全的容器代替,对于ConcurrentHashMap也有一定的了解,但是由于没有深入到源码层面,很多理解都是浮于表面,稍微深一点的东西就不是很懂。这两天终于下定决心将ConcurrentHashMap的源码探究了一遍,记录一下心得体会,算是对阅读源码的一个总结吧。需要提醒读者注意,因为个人水平有限,且本文本质上来讲是留给未来的自己进行查阅的总结,所以难免会有错漏,一经发现,本人会尽快纠正,也欢迎大家提出宝贵的意见。 温馨提示:本文在一些可能存在疑惑的地方使用3个?标识,这是本人设置的占位符,提醒自己以后要过来更新,大家在阅读的时候也要注意区分。

1.构造器

先从构造器讲起。ConcurrentHashMap共有5个构造器,不论是哪个构造器,最后初始化后的容量都是2的整数幂,这些构造器签名分别如下:

public ConcurrentHashMap();  //a
public ConcurrentHashMap(int initialCapacity);  //b
public ConcurrentHashMap(Map<? extends K, ? extends V> m);  //c
public ConcurrentHashMap(int initialCapacity, float loadFactor);  //d
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel);  //e

构造器a的方法体是空的,像容量、加载因子这些参数都取默认值;构造器b需要一个初始容量作为参数,代码如下:

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

首先检查参数合法性,如果参数initialCapacity超过了最大允许容量(MAXIMUM_CAPACITY = 1 << 30)的一半,则将容量设置为MAXIMUM_CAPACITY,否则使用tableSizeFor方法来计算容量,最后将sizeCtl参数设置为容量的值。关于sizeCtltableSizeFor等将在后文介绍。 构造器c使用一个外部的map进行初始化,sizeCtl设置为默认容量,然后调用putAll方法进行容器初始化和复制操作。

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

构造器d在内部直接调用构造器e,这两个构造器唯一不同的是,构造器e额外提供了一个concurrencyLevel参数,构造器d将这个参数设置为1

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

构造器e的逻辑仍然是先检查参数合法性,concurrencyLevel参数的唯一作用是作为initialCapacity的下限,除此之外别无他用。从这里可以看出 JDK 1.7 版本的并发度参数DEFAULT_CONCURRENCY_LEVEL已弃用。 构造器be在设置容器初始容量的时候有一点不同,构造器b使用的是initialCapacity + (initialCapacity >>> 1) + 1(即1.5*initialCapacity+1)作为基础容量,而构造器e使用的是1.0 + (long)initialCapacity / loadFactor(这里的initialCapacity实际上相当于HashMap中的threshold,当loadFactor = 2/3时两者是相等的),在使用的时候需要注意这一点。

2.主要字段

  • sizeCtl:该字段出镜率非常高,取值复杂,要读懂源码,该字段是必须要弄清楚的。概括来说,该字段控制内部数组初始化和扩容操作,其取值如下:
    • 负数
      • -1:表示容器正在初始化。
      • -NN-1个线程正在执行扩容。
      • 扩容前会被修改成 (resizeStamp(tab.length) << RESIZE_STAMP_SHIFT) + 2,并且每增加一个扩容线程,sizeCtl的值加1,扩容线程完成对应桶的迁移工作,sizeCtl1,扩容完成后该值再次被设置成扩容阈值。
    • 正数和0
      • 在初始化的时候表示容器初始容量。
      • 初始化之后表示容器下次扩容的阈值(类似于HashMap中的threshold)。
  • RESIZE_STAMP_BITSRESIZE_STAMP_SHIFTsizeCtl中记录stamp的两个字段,这两个字段在源码中没有任何位置会修改,它们的值目前都是16(不太明白这个stamp是什么意思)。
  • 红黑树相关字段:
    • TREEIFY_THRESHOLD = 8:桶由链表转换成红黑树结构的阈值,表示桶中的元素个数大于等于8个时将转换成红黑树。
    • UNTREEIFY_THRESHOLD = 6: 桶由红黑树转换成链表结构的阈值,表示桶中的元素个数小于等于6个时将转换成链表。
    • MIN_TREEIFY_CAPACITY = 64:启用红黑树的最小元素个数,当集合中的元素个数不足64时,即使某个桶中的元素已经达到8个,也只是执行扩容操作,而不是升级为红黑树。
  • 容量相关字段:
    • MAXIMUM_CAPACITY = 1 << 30:最大容量
    • DEFAULT_CAPACITY = 16:默认容量
    • LOAD_FACTOR:加载因子,默认为0.75f
  • table: 存放容器元素的数组对象
  • nextTable:平时为null,在扩容时指向扩容后的数组。
  • baseCount:记录元素个数,注意该计数器在多线程环境下不准,需要配合countCells使用。
  • counterCells:多线程环境下,用来暂时存放元素用。(???)
  • transferIndex:表示扩容时将数据从就数组向新数组迁移时的下标,多线程时根据该字段给各个线程分配各自独立的迁移区间,以实现多线程协作扩容。

3.核心方法

ConcurrentHashMap是用来存储元素的,最常用的就是一些增删改查方法,此外,在容量不足时,会自动触发扩容操作。 接下来将对ConcurrentHashMap中的主要方法进行分析。

  • Unsafe类相关方法 ConcurrentHashMap废弃了分段锁,改用 CAS + Synchronized + valatile保证线程安全,而Java主要通过Unsafe类实现CAS,因此源代码大量使用了Unsafe类的三个CAS方法,如下:
   - compareAndSwapObject(Object o, long offset, Object expected, Object x);
   - compareAndSwapInt(Object o, long offset, int expected, int x);
   - compareAndSwapLong(Object o, long offset, long expected, long x);

这些方法非常相似,区别只是参数expectedx的类型。它们表达的意思是,如果对象ooffset位置的值是expected,则把值修改为x,否则不修改。其中o是给定的对象,offset表示对象内存偏移量,expected表示当前位置的期望值,x表示修改后的新值。 此外,ConcurrentHashMap封装了三个数组元素访问方法,底层依然是调用Unsafe类:

    //从主存获取tab[i],避免读到脏数据
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //将tab[i]的值从c改成v
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    //将tab[i]的值v写到主存
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
  • tableSizeFor
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

tableSizeFor的作用是计算第一个大于等于c2的整数幂,ConcurrentHashMap用这个方法计算得到的结果来作为内部数组的长度。关于这个方法的介绍,网上已经有许多资料,这里不再赘述,仅仅记录下源码,因为代码确实太惊艳了,提醒自己要时常学习源码精髓。

  • 获取元素(get(Object key))的源码如下:
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算key的hash值
        int h = spread(key.hashCode());
        //数组长度大于0,且对应的桶不为空,其中(n-1) & h是计算哈希值h对应的数组位置
        if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果头结点就是目标节点,则返回该节点的值
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //在红黑树或者nextTable中进行查找
            //如果桶中的结构是红黑树,那么root节点的hash值是-2,如果容器正在扩容,会把ForwardingNode节点放在桶里作占位符,这种类型的节点hash值为-1
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //走到这里说明table里都是正常的链表节点,按顺序查找即可
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法先进行一系列验证,之后先判断头节点是不是key对应的节点,是就返回,否则检查头节点的hash值是不是负数,是负数的话就去红黑树或者扩容后的nextTable查找,不是负数则说明key对应的桶里是链表结构,则按顺序查找。其中方法spread方法用于计算keyhash值,其代码如下:

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

h ^ (h >>> 16) 是将h右移16位之后,再与h进行亦或,结果中高16位保持不变,低16位是保存的是原来高16位和低16位的亦或结果。HASH_BITS的值是0x7fffffff,即2^31-1,只在spread方法里用到,(h ^ (h >>> 16)) & HASH_BITS的结果相当于只是去掉了h ^ (h >>> 16)的符号位,后面在计算下标时再次与数组长度作按位与操作,完整的下标计算相当于((h ^ (h >>> 16)) & HASH_BITS) & (n-1),由于n32位整数,其最大值也不会大于HASH_BITS,所以((h ^ (h >>> 16)) & HASH_BITS) & (n-1)的效果和(h ^ (h >>> 16)) & (n-1)一样,似乎这里并不需要HASH_BITS,和HashMap中的hash()方法保持一致不就行了?这里留个疑问待以后解答。 现在来总结一下get方法的执行流程,如下图: 从流程图中发现了之前看代码时没有注意到的东西,当发现桶内节点是ForwardingNode节点时,会去新的数组内查找,这段代码在ForwardingNode内部定义,来看看ForwardingNode中的find方法:

Node<K,V> find(int h, Object k) {
    // loop to avoid arbitrarily deep recursion on forwarding nodes
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        //基本的非空判断
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        for (;;) {
            int eh; K ek;
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            if (eh < 0) {
                //注意这里的判断,当发现新数组内也出现了ForwardingNode占位节点,则继续递归地遍历新数组
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                // 其他两种情况会执行到这里:1)eh=-2,表示红黑树;2)eh=-3,是ReservationNode类型占位节点,直接返回null
                else
                    return e.find(h, k);
            }
            if ((e = e.next) == null)
                return null;
        }
    }
}

这里是一段递归逻辑,也就是说可能会出现扩容还没完成接着又扩容的情况,所以需要重复判断新数组头结点是不是扩容时的占位节点(注意:目前阅读代码时并没有注意到这种重复扩容的情况,不清楚具体会不会出现扩容还没完成,空间就不够了导致重复扩容的情况,等弄清楚了之后会更新在这里???)。

  • 添加/修改元素 put方法底层调用的putVal,源码如下:
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    //onlyIfAbsent表示key不存在才插入,存在则不更新
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //计算hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果数组还没初始化,则先初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //如果key对应的桶是空的,并且通过原子操作成功的将新节点插入桶中,则本次插入结束,转入后续的addCount操作
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果当前有线程正在转移数据,则帮助其转移
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //这里只对单个桶上锁,相比于分段锁的并发性大大增加
                synchronized (f) {
                    //这里再次判断头结点有没有发生变化,因为从上次赋值到加锁期间,很可能有其他线程将tab[i]桶内的数据迁移到了新数组
                    if (tabAt(tab, i) == f) {
                        //fh>=0表示桶内结构是链表
                        if (fh >= 0) {
                            //桶内元素计数器
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //找到key,则更新对应的值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //key不存在,则将新节点插入链表结尾
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //桶内结构是红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            //红黑树的binCount直接赋值为2,个人理解是root节点只占位置,不保存数据?留个疑问后面来解答。
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //桶内元素大于等于8,则转换成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //修改计数器的值,成功添加新元素才会走到这里
        addCount(1L, binCount);
        return null;
    }

put方法的流程图如下:

如果数组没有初始化,需要先执行initTable()进行初始化,从这里可以看到,ConcurrentHashMap采用延迟初始化的策略,第一次添加元素的时候才对内部数组进行初始化。initTable()源码如下:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //sizeCtl < 0 表示当前有其他线程正在执行初始化,调用Thread.yield()方法让当前线程让出CPU时间片,保证了只能有一个线程对数组进行初始化
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //如果当前线程成功将sizeCtl的值从sc更新为-1,则由当前线程执行初始化操作,从这里可以看出sizeCtl=-1表示当前正在执行初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        //计算数组容量
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //相当于sc = 0.75*n,即数组长度*0.75,类似于HashMap中的threshold
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //初始化完成后,将sizeCtl的值更新成扩容阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
  • helpTransfer 在putVal方法中,一个比较有意思的地方在于,如果当前线程发现有其他线程正在进行数据转移工作(即在扩容中),就帮助转移数据,该方法源码如下:
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        //先确认已初始化,再确认f是ForwardingNode类型节点,即当前确实有线程在执行扩容和迁移数据的操作,最后确认扩容后的新数组是否已初始化完毕
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            //因为存在多线程扩容的情况,每次都需要重新判断扩容条件是否还满足
            //如果扩容已完成,那么table=nextTable, nextTable=null,并且sizeCtl变成下次扩容的阈值,下面的三项检查分别与这里的情况对应
            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //如果成功将sizeCtl的值加1,则进入transfer执行数据迁移工作,从这里也可以看出,每当有新线程协助扩容时,会将sizeCtl的值加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

helpTransfer里面最复杂的就是while循环体内的第一个条件判断语句,接下来将一一进行分析。判断条件里频繁出现rs这个变量,有必要先分析resizeStam方法,其源码非常简单,只有一行:

    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

Integer.numberOfLeadingZeros(n)是计算当前数组长度n的二进制数表示中,最左边的1之前有多少个0RESIZE_STAMP_BITS=16,假设n=16,其二进制最左边有270,那么resizeStamp返回的结果的二进制就是1000000000011011。由于n是数组容量,最小值是2,其最大值是1<<30Integer.numberOfLeadingZeros(n)的范围是1~30,因此resizeStamp的范围是32769~32798。 了解了resizeStamp,接着再回到helpTransfer方法,现在重点关注前文提过的条件判断语句,摘录如下:

((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0

对于第一个条件,从前面对sizeCtl的讲解知道,在扩容前sizeCtl会被设置为sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,这是一个负数,那么将sc无符号右移16位刚好得到resizeStamp(n),即rs变量的值。源码中先做(sc >>> RESIZE_STAMP_SHIFT) != rs的判断,保证rssc这两个对应的数组长度相同,即当前扩容还没结束。 第一个条件不满足才会接着判断第二个条件,此时条件(sc >>> RESIZE_STAMP_SHIFT) == rs已成立。sc == rs + 1 || sc == rs + MAX_RESIZERS 这两个条件有些没看太懂,大概是限制扩容的线程不要超过最大允许线程数。但是这两个判断条件应该是有问题的,因为此时sc < 0,但是rs > 0,这两个条件都不可能成立,具体为什么要这样写不是太清楚,有待后续查证。transferIndex是数据迁移的位置变量,从后往前开始迁移数据,当transferIndex <= 0时,说明迁移已经结束了。

  • transfer ConcurrentHashMap的扩容实际上就是新建一个两倍大的数组,然后将老数据迁移到新数组的过程,这也是transfer的字面意思。数据迁移的真正过程都在transfer方法里,该方法总共一百多行,下面将结合源码一点点进行解析。
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //stride相当于一个步长变量,每个线程需要承担stride个桶数据的迁移工作,这里初始化的过程会参照机器CPU数量,在多CPU上stride=n/(8*NCPU),单CPU上stride=n,但是stride不能小于16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //如果新数组还没初始化,在这里进行初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //新建两倍长度的数组
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //nextTab字段在这里才不为null,而是指向新数组
            nextTable = nextTab;
            //数据迁移的下标,从后往前迁移
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //是否可以开始迁移下一个桶的标识符,当前桶迁移完毕才可以接着迁移前一个桶的数据
        boolean advance = true;
        //记录当前迁移工作是否结束
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //如果当前桶已迁移完毕,开始处理下一个桶
            while (advance) {
                int nextIndex, nextBound;
                //这里迁移的桶区间是[bound,i-1],因此--i>=bound说明当前线程的迁移任务还没结束,需要跳出while循环,执行后面的迁移工作
                if (--i >= bound || finishing)
                    advance = false;
                //如果所有的桶都已经分给了相应的线程进行处理,没有多余的桶给当前线程了,将i设置为-1,让当前线程退出迁移操作
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //如果当前线程成功分到[nextIndex-stride,nextIndex-1]的区间,则可以开始进行数据迁移了,在还有多余的桶没有分配时,新加入进来的扩容线程会先执行到这里领取任务,下一个线程进来将会接着从nextIndex-stride往前迁移stride个桶的数据
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //当出现下面这些情况时,说明扩容已结束,需要做一些善后工作
            //i<0的情况在上面的while循环里出现过,但是i >= n 和 i + n >= nextn这两个条件会在什么情况下出现呢???
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //如果所有的数据迁移都已完成,则执行这里的逻辑退出
                if (finishing) {
                    //nextTable重新赋值为null
                    nextTable = null;
                    //table指向新数组,原来的数组将被GC回收
                    table = nextTab;
                    //sizeCtl = 2*n-0.5*n=1.5*n,这个值实际上就是新数组长度的0.75倍,即下一次扩容的阈值。这里的做法很巧妙,n是原数组的长度,扩容后的长度是2*n,按照0.75的加载因子来算,新数组的扩容阈值就是2*n*0.75=1.5*n,但位运算显然更快一些
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //如果当前线程的迁移工作已完成,就将sizeCtl的值减1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //如果sizeCtl的值变成了扩容前的值(即resizeStamp(n) << RESIZE_STAMP_SHIFT + 2),说明扩容完成
                    //问题:上面的原子操作是现将修改前的sizeCtl赋值给sc,然后才将sizeCtl减1,那么sc应该永远取不到resizeStamp(n) << RESIZE_STAMP_SHIFT + 2才对,下面的return语句不会执行,这里是不是哪里理解得不对???留待以后验证。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //走到这里说明所有的迁移工作都完成了,设置相关字段
                    finishing = advance = true;
                    //将i设置为n,那么线程将会接着执行最外层for循环,从后向前逐个检查是否每个桶都已完成数据迁移
                    i = n; // recheck before commit
                }
            }
            //如果原来的桶里没有数据,插入一个ForwardingNode作占位符,告诉其他线程当前正在进行扩容
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //这个判断紧接着上一个判断,如果已经有占位符了,说明有线程已经处理过这个桶了,不能再处理这个桶
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //当前桶既不是占位符也不是空(即桶里面是链表或红黑树),会走到这里
            else {
                //迁移桶内数据需要加锁,避免其他线程同时增加、删除或修改桶里的数据
                synchronized (f) {
                    //从上一次取桶的头节点到加锁之前,可能该桶已经被其他线程处理过,需要进行二次判断
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //桶里是链表结构
                        if (fh >= 0) {
                            //这里值得注意,回忆一下上面对put方法的分析,插入元素时,元素的hash值是根据spread(key.hashCode())进行计算的,hash值在扩容期间肯定不会变,变化的仅仅是数组的长度,而数组的长度在扩容后会左移1位,因此元素在新数组中的位置就由hash值从低位开始的第n位决定,如果hash值第n位是0,则元素还在下标为i的这个桶里,否则元素的新位置是i+n。这里的处理逻辑与HashMap中一样,都是为了降低计算新位置的开销。
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //这个for循环是为了避免新建不必要的节点,即经过计算发现,如果当前链表的某个元素a及其后面的所有节点都在同一个桶内,那么在进行数据转移时,只需要处理a节点之前的节点即可,a节点及其之后的节点仍然维持原来的链表结构放在新数组的对应桶里,这里的lastRun就是待确定的a节点
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //如果最后几个节点都呆在原来的桶里,则设置ln指向lastRun节点,否则将hn指向lastRun节点
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //这里才是真正的数据转移过程,跟前面的分析一样,从头结点处理到lastRun节点即可。
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //以下的逻辑表明,数据迁移是采用在链表头部插入数据的做法,lastRun之前的节点顺序会被反转
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //ln对应老的桶下标i,hn对应新的桶下标i+n
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //在元素迁移完之后,会将桶的头结点设置为ForwardingNode占位节点
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //桶内是红黑树结构,逻辑与链表大同小异,不再赘述
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

终于分析完transfer的逻辑了~~~现在来整理一下需要注意的地方。首先需要注意的是ForwardingNode节点则这段代码里有三处地方会用到,一是在最外层的for循环的逻辑中,当发现桶是null时,会将ForwardingNode节点插入,实际上这种情况也可以视为这个桶数据已迁移完成,另外两处都是在迁移完数据之后,将ForwardingNode插入作占位符,因此当遍历元素遇到占位符时,都表明当前正在扩容,且这个桶里的数据已经迁移完了;二是数据迁移的过程中,链表首部的节点会变成倒序。 分析到这里,put->initTable->helpTransfer->transfer这条线已经分析完了,回忆一下这个调用链上的代码,发现共有两个地方加了同步对象锁,一个是在put方法内往桶内添加元素时,另一个是在transfer方法中迁移某个桶的数据时,且锁定的都是桶内的头节点。这两个锁会互斥,这意味着:

  • put方法成功加锁,则迁移数据的线程在迁移到对应桶时会阻塞;
  • transfer方法成功加锁,则当前往对应桶内添加元素的线程会阻塞; 也可以看出,ConcurrentHashMap每次只锁定一个桶,只会在单个桶内造成冲突,并发性相比之前的分段锁大大提高。 put方法最后还遗留了一个addCount方法没有分析,这部分内容跟容器计数有关,先看看addCount的源码:
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //这部分跟计数有关
        //满足if判断的条件:①counterCells!=null ② counterCells=null,且并发修改baseCount的值失败,说明当前有其他线程也在修改baseCount
        if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //这个判断是什么意思目前不太清楚???留给后面确认并补充。
            if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            //counterCells!=null,并且counterCells中有数据,并且下标a对应的元素不是null,且成功的将CELLVALUE从a.value修改成a.value+x,才会执行到这里
            if (check <= 1)
                return;
            //计算元素个数
            s = sumCount();
        }
        //这部分跟扩容有关
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //s>=sizeCtl表示达到扩容阈值,需要进行扩容,条件是数组已经初始化并且没有达到最大容量
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //sc < 0 表示扩容已开始
                if (sc < 0) {
                    //这里的判断跟helpTransfer方法一样,不再赘述
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //当前线程加入扩容工作
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //走到这里说明扩容工作还没开始,由当前线程开始扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

这段代码里出现了counterCells字段,但是到目前为止,并没有看到在哪里对这个字段进行了赋值,全局搜索发现,该字段只在fullAddCount方法里被赋值,而fullAddCount方法也出现在addCount中,因此接下来将分析一下fullAddCount方法。

    //See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //如果ThreadLocalRandom还没有初始化,这里先进行初始化
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        //这里会自旋
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //如果counterCells已经初始化
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //这里的原子操作限制了每次只能有一个线程执行到此处
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                //如果counterCells在h对应的索引位置还没初始化,则初始化
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //如果counterCells还没初始化
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        //counterCells要求长度必须是2的整数幂,因此先初始化为2
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

从源码的注释可以看到,fullAddCount借鉴了LongAdder的思想,因此源码并没有在这里给出详细解释。本人对这块也是一知半解,因此上面仅仅是把理解的内容进行了分析,而对于addCountfullAddCount的实现原理并没有透彻的理解,这部分后面需要补充,此处就不再进行额外的解读,以免误人子弟。

  • tryPresize 跟扩容有关的方法还有tryPresize,这个方法在ConcurrentHashMap中只有两个地方调用,一个是putAll方法,另一个是treeifyBin。源码如下:
    private final void tryPresize(int size) {
        //计算扩容后的数组长度
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //如果数组还没有初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                //将sizeCtl设置为-1,开始执行初始化操作
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            //如果当前元素个数没有达到扩容阈值(c<=sc),或者数组长度已经到最大值了,不需要扩容
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                //扩容已开始
                if (sc < 0) {
                    Node<K,V>[] nt;
                    //如果扩容已结束或者扩容线程已达到最大,当前线程什么也不干
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //当前线程加入扩容工作
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //扩容还没开始,就由当前线程开始扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
  • 统计容器内元素个数 在ConcurrentHashMap中,统计元素个数应该使用sumCount()而不是size()方法,原因在于:方法返回的是int类型,而实际上concurrentHashMap存储的元素可以超过这个范围,sumCount方法的返回值为long类型可以说明这一点,其源码如下:
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        //元素数量是baseCount和counterCells中保存的元素数量之和
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

4.问题整理

在阅读代码的过程中,仍然有些地方的逻辑不是很懂,在这里将这些问题记录下来,方便查阅和后期补充。

  • counterCells字段是用来干什么的?
  • spread方法里为何要和HASH_BITS作按位与计算?
  • putVal方法中,当发现桶中的数据结构是红黑树时,binCount为何直接赋值为2
  • transfer方法中,for循环体重的if条件语句中,什么情况下会满足i >= ni + n >= nextn这两个条件?
  • transfer方法中,下面的if语句似乎永远为false,怎么理解?
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
  • addCountfullAddCount方法的原理还不是太理解,需要再研究研究。
  • RESIZE_STAMP_BITSRESIZE_STAMP_SHIFT这两个字段的作用不太了解,以及sizeCtl里面是哪部分在记录stamp
  • ⑦源码中在修改sizeCtl的值时,有时候直接使用sizeCtl,有时候又使用类似于U.compareAndSwapInt(this, SIZECTL, sc, -1)这种,两者有何区别?
  • ⑧源码中多次出现if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)这种判断,其中的sc == rs + 1 || sc == rs + MAX_RESIZERS该如何理解,这段代码是否有问题?
  • ConcurrentHashMap会出现扩容还没完成,但是新数组空间又不够的情况吗?

5.总结

读源码的过程是痛苦的,尤其是刚开始阅读的时候,有许许多多东西都看不懂,以至于根本不知道从哪里看起,然而一旦能够看下去了,会发现其实源码并不是太可怕,遇到的困难都是可以克服的。经过JDK 1.8之后,ConcurrentHashMap的源码的行数达到了6000+,本文只是记录了ConcurrentHashMap非常有限的内容,还有许多内容并未触碰,因此可以说本人对于ConcurrentHashMap的了解也非常有限。最开始读源码时,并没有想过要写一篇博客来记载,但是很快发现,如果不记下来,过不了多久就会把已经看过的内容忘个七七八八,因此才动了写篇读后感的念头。当然,在写作的过程中,自己的思路也捋顺了许多,也算是个额外的收获。

6.参考文献

从开始看源码到写完本文,其实看了不少前人的优秀文章,这里先占个坑位,后面会把相关参考资料整理到这里。、

7.更新日志

  • 3.3更新了getput方法的流程图

原文出处:https://www.cnblogs.com/NaLanZiYi-LinEr/p/12380205.html

展开阅读全文
jdk
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部