2.3、JDK 源码分析 - ConcurrentHashMap1.7

原创
2022/07/19 14:18
阅读数 134

摘要

我们都知道HashMap是线程不安全的,扩容时有可能还会产生死循环!那么有没有一种比较安全的HashMap给我们使用呢?JDK其实已经为我们提供了一种实现,它就是ConcurrentHashMap;

介绍

一个支持检索的完全并发性和更新的可调预期并发性的哈希表。 这个类遵守与Hashtable相同的功能规范,并包含与Hashtable的每个方法对应的方法版本。 然而,即使所有操作都是线程安全的,检索操作也不需要锁定,并且不支持以阻止所有访问的方式锁定整个表。 在依赖Hashtable的线程安全性但不依赖其同步细节的程序中,这个类与Hashtable完全可互操作。

检索操作(包括get)通常不会阻塞,因此可能与更新操作(包括put和remove)重叠。 检索反映了最近完成的更新操作在开始时保持的结果。 对于聚合操作(如putAll和clear),并发检索可能只反映某些条目的插入或删除。 类似地,Iterators和Enumerations返回的元素反映了哈希表在创建迭代器/枚举时的某个时点或自创建以来的状态。 它们不会抛出ConcurrentModificationException。 但是,迭代器被设计为一次只能被一个线程使用。

更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数(默认16)指导,该参数被用作内部大小调整的提示。 对表进行内部分区,以尝试在没有争用的情况下允许指定数量的并发更新。 因为哈希表中的位置基本上是随机的,所以实际的并发性会有所不同。 理想情况下,您应该选择一个值来容纳尽可能多的并发修改表的线程。 使用比您需要的高得多的值可能会浪费空间和时间,而使用较低的值可能会导致线程争用。 但是在一个数量级内的高估和低估通常不会产生太明显的影响。 当知道只有一个线程会修改,而其他所有线程只会读取时,值1是合适的。 另外,调整这个或任何其他类型的散列表的大小是一个相对较慢的操作,因此,如果可能的话,最好在构造函数中提供预期表大小的估计。

这个类及其视图和迭代器实现了Map和Iterator接口的所有可选方法。 与Hashtable类似但又不同于HashMap,该类不允许将null用作键或值。

基本策略是将表细分为Segments,每个Segments本身是一个并发可读的哈希表。 为了减少内存占用,除了一个段之外的所有段只在第一次需要时才构造(参见ensureSegment)。 为了在惰性构造的情况下保持可见性,访问段以及段表的元素必须使用volatile访问,这是通过不安全的方法segmentAt等完成的。 它们提供了AtomicReferenceArrays的功能,但减少了间接级别。 另外,锁操作中对表元素和条目“next”字段的volatile写操作使用更便宜的“lazySet”形式(通过putOrderedObject),因为这些写操作之后总是会释放锁,以保持表更新的顺序一致性。

历史提示:该类的上一个版本严重依赖于“final”字段,这避免了一些volatile读取,但代价是大量的初始占用空间。 该设计的一些残余(包括强制构造段0)存在以确保串行兼容性。

源码解析

(1)、类定义

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
   ...
}

类定义没有什么特别之处,就是实现ConcurrentMap接口,在ConcurrentMap定义了几个原子方法

(2)、常量定义

    /**
     * 该表的默认初始容量,当没有在构造中指定时使用
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * 该表的默认加载因子,当没有在构造函数中指定时使用。
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /**
     * 该表的默认并发级别,当没有在构造函数中指定时使用。
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 最大容量,如果一个较大的值由带参数的构造函数中的任何一个隐式指定,则使用该值。 必须是2的幂<= 1<<30,以确保条目可以使用整数进行索引。
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 每个段表的最小容量。 必须是2的幂,至少为2,以避免在惰性构造后的下一次使用中立即调整大小。
     */
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    /**
     * 允许的最大段数; 用于绑定构造函数参数。 必须是2的幂,小于1 << 24。
     */
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative  略显保守

    /**
     * 在诉诸锁定之前,在大小和containsValue方法上的非同步重试次数。 这用于避免在表经历连续修改时进行无界重试,这将导致无法获得准确的结果。
     */
    static final int RETRIES_BEFORE_LOCK = 2;

(3)、字段定义

    //与此实例关联的随机值,应用于键的散列代码,使散列冲突更难找到。
    private transient final int hashSeed = randomHashSeed(this);

    /**
     * Mask value for indexing into segments. The upper bits of a
     * key's hash code are used to choose the segment.
     * 索引段的掩码值。 键的哈希码的上位用于选择段。
     */
    final int segmentMask;

    /**
     * 在段内移位索引值。
     */
    final int segmentShift;

    /**
     * 每个段都是一个专门的哈希表。用于存放真实数据
     */
    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

(4)、内部类HashEntry

    /**
     * ConcurrentHashMap列表条目。 请注意,它从未作为用户可见的Map.Entry导出。
     */
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * 用volatile写语义设置下一个字段。 (请参阅上面关于putOrderedObject的使用。)
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

(6)、构造方法

    /**
     * 创建一个新的空映射,带有指定的初始容量,以及默认的负载因子(0.75)和concurrencyLevel(16)。
     */
    public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    /**
     * 创建一个新的空映射,具有默认的初始容量(16)、负载因子(0.75)和并发级别(16)。
     */
    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    /**
     * 创建具有与给定映射相同映射的新映射。 创建的映射的容量是给定映射的1.5倍或16(哪个更大),以及默认的负载因子(0.75)和concurrencyLevel (16)
     */
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
                      DEFAULT_INITIAL_CAPACITY),
             DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
        putAll(m);
    }
    /**
     * 使用指定的初始容量、负载因子和并发级别创建一个新的空映射。
     *
     * @param initialCapacity 最初的能力。 该实现执行内部大小调整以适应这么多元素。
     * @param loadFactor 负载因子阈值,用于控制调整容量大小。当元素的平均数量超过这个阈值时,可以执行调整大小。
     * @param concurrencyLevel 并发更新线程的估计数目。 该实现执行内部大小调整以尝试容纳这么多线程。
     */
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //参数合法性检查
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //并发级别如果操作最大允许的段,则强制设置为最大允许的段大小
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments 找到两倍大小的最佳匹配参数
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

(7)、内部类Segment

段是哈希表的特殊版本。 这个子类取巧地继承了ReentrantLock,只是为了简化一些锁并避免单独构造。

段维护一个始终保持一致状态的条目列表表,所以可以在不锁定的情况下读取(通过对段和表的volatile读取)。 这需要在调整表大小时复制节点,因此仍然使用旧版本表的读者可以遍历旧列表。

该类只定义需要锁定的可变方法。 除了前面提到的,这个类的方法执行ConcurrentHashMap方法的每段版本。 (其他方法直接集成到ConcurrentHashMap方法中。) 这些可变的方法通过scanAndLock和scanAndLockForPut方法在争用时使用一种控制旋转的形式。 这些函数将trylock与遍历穿插在一起以定位节点。 它的主要好处是在获取锁时吸收缓存丢失(这在散列表中很常见),这样一旦获得锁,遍历就会更快。 我们实际上并不使用找到的节点,因为它们必须在锁定状态下重新获取,以确保更新的顺序一致性(在任何情况下都可能无法检测到过时),但它们通常会更快地重新定位。 此外,如果没有找到节点,scanAndLockForPut投机性地创建一个新的节点用于put。

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        /**
         * 预扫描中在可能阻塞之前尝试锁定的最大次数,为锁定段操作做准备。 在多处理器上,使用有限的重试次数来维护定位节点时获得的缓存。
         */
        static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        /**
         *  每段的hash表。 通过提供volatile语义的entryAt/setEntryAt访问元素。
         */
        transient volatile HashEntry<K,V>[] table;

        /**
         * 元素的数量。 只能在锁中或在其他保持可见性的volatile读取中访问。
         */
        transient int count;

        /**
         * 在这个段中发生变化的操作的总数。 尽管这可能会溢出32位,但它为CHM isEmpty()和size()方法中的稳定性检查提供了足够的准确性。 
         * 只能在锁中或在其他保持可见性的volatile读取中访问。
         */
        transient int modCount;

        /**
         * 当表的大小超过此阈值时,表将重新散列。 (该字段的值总是(int)(capacity * loadFactor)。)
         */
        transient int threshold;

        /**
         * 哈希表的负载因子。 即使这个值对于所有的段都是相同的,它也会被复制以避免需要连接到外部对象。
         */
        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
        ....
    }

从上面可以看出分段,这里面的段 其实是一个个锁。

(8)、get方法

    /**
     * 返回指定键映射到的值,如果这个映射不包含该键的映射,则返回{@code null}。
     */
    public V get(Object key) {
        Segment<K,V> s;
        HashEntry<K,V>[] tab;
        int h = hash(key);
       //定位段地址
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        //通过地址获取内存可见的分段
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);  e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

(9)、put方法

    /**
     * 将指定的键映射到该表中的指定值。 键和值都不能为空。
     *
     * 可以通过调用get方法来检索该值,该方法使用的键等于原始键。
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return 前一个值与键关联,或null(如果键没有映射的话)
     * @throws NullPointerException 如果指定的键或值为空
     */
    public V put(K key, V value) {
        Segment<K,V> s;
        //如果值为空则抛出异常
        if (value == null)
            throw new NullPointerException();
        //如果值为空,该方法也会抛出异常
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        // nonvolatile; recheck
        if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null)
            //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

调用分段的put方法

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //对当前段加锁;如果成功,则进行进行后续操作;否则,调用scanAndLockForPut方法进行扫描
            HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        //判断节点是否key和hash相等,如果是,则进行value替换
                        if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        //如果不存在该Key的映射,则在链表头部插入一个新节点
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        //如果达到阈值,则扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            //将新节点放入tab对于的槽位
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }


调用scanAndLockForPut方法,进行自旋

        /**
         * 当试图获取锁时,扫描包含给定键的节点,如果返回,创建并返回一个键,确保锁被持有。
         *  与大多数方法不同,对equals方法的调用不会被筛选:由于遍历速度无关紧要,我们还可以帮助预热相关的代码和访问。
         *
         * @return a new node if key not found, else null
         */
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            //获取hash对应槽位的第一个节点
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // 定位节点时为负
            //尝试获取锁
            while (!tryLock()) {
                //获取失败
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    //如果该槽位上的第一个节点为空,则新建一个节点
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    //如果该槽位上的第一个节点非空,且key和当前put的相同
                    else if (key.equals(e.key))
                        retries = 0;
                    //否则,继续遍历
                    else
                        e = e.next;
                }
                //如果达到尝试获取锁的次数达到最大允许尝试次数,则直接加锁
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

(10)、remove方法

    /**
     * 从映射中移除该键(及其对应的值)。 如果键不在映射中,此方法将不执行任何操作。
     *
     * @param  key the key that needs to be removed
     * @return the previous value associated with <tt>key</tt>, or
     *         <tt>null</tt> if there was no mapping for <tt>key</tt>
     * @throws NullPointerException if the specified key is null
     */
    public V remove(Object key) {
        int hash = hash(key);
        //通过hash定位到segment
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }

调用segment.remove方法

        /**
         * 删除; 仅当value为null时匹配key,否则两者都匹配。
         */
        final V remove(Object key, int hash, Object value) {
            //尝试加锁
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
        /**
         * 当试图获取删除或替换操作的锁时,扫描包含给定键的节点。 返回时,保证锁被持有。
         * 注意,即使没有找到键,我们也必须锁定,以确保更新的顺序一致性。
         */
        private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
        }

数据结构

总结:

(1)、ConcurrentHashMap1.7底层数据结构主要是分段锁+数组+链表; 分段锁Segment通过继承ReentrantLock来实现;

(2)、进行put、remove等操作时,它会将hash值对于的段进行加锁,然后进行相应的操作。加锁的时候,刚开始会实行尝试获取自旋,然后超过最大尝试次数后,直接加锁。

(3)、另外,底层实现上使用volatile,sun.misc.Unsafe的getObjectVolatile()、putOrderedObject()、getObject()等等一些底层方法来实现。关于Unsafe这一块的知识,后期再进行补充!

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部