摘要
我们都知道HashMap是线程不安全的,扩容时有可能还会产生死循环!那么有没有一种比较安全的HashMap给我们使用呢?JDK其实已经为我们提供了一种实现,它就是ConcurrentHashMap;
介绍
一个支持检索的完全并发性和更新的可调预期并发性的哈希表。 这个类遵守与Hashtable相同的功能规范,并包含与Hashtable的每个方法对应的方法版本。 然而,即使所有操作都是线程安全的,检索操作也不需要锁定,并且不支持以阻止所有访问的方式锁定整个表。 在依赖Hashtable的线程安全性但不依赖其同步细节的程序中,这个类与Hashtable完全可互操作。
检索操作(包括get)通常不会阻塞,因此可能与更新操作(包括put和remove)重叠。 检索反映了最近完成的更新操作在开始时保持的结果。 对于聚合操作(如putAll和clear),并发检索可能只反映某些条目的插入或删除。 类似地,Iterators和Enumerations返回的元素反映了哈希表在创建迭代器/枚举时的某个时点或自创建以来的状态。 它们不会抛出ConcurrentModificationException。 但是,迭代器被设计为一次只能被一个线程使用。
更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数(默认16)指导,该参数被用作内部大小调整的提示。 对表进行内部分区,以尝试在没有争用的情况下允许指定数量的并发更新。 因为哈希表中的位置基本上是随机的,所以实际的并发性会有所不同。 理想情况下,您应该选择一个值来容纳尽可能多的并发修改表的线程。 使用比您需要的高得多的值可能会浪费空间和时间,而使用较低的值可能会导致线程争用。 但是在一个数量级内的高估和低估通常不会产生太明显的影响。 当知道只有一个线程会修改,而其他所有线程只会读取时,值1是合适的。 另外,调整这个或任何其他类型的散列表的大小是一个相对较慢的操作,因此,如果可能的话,最好在构造函数中提供预期表大小的估计。
这个类及其视图和迭代器实现了Map和Iterator接口的所有可选方法。 与Hashtable类似但又不同于HashMap,该类不允许将null用作键或值。
基本策略是将表细分为Segments,每个Segments本身是一个并发可读的哈希表。 为了减少内存占用,除了一个段之外的所有段只在第一次需要时才构造(参见ensureSegment)。 为了在惰性构造的情况下保持可见性,访问段以及段表的元素必须使用volatile访问,这是通过不安全的方法segmentAt等完成的。 它们提供了AtomicReferenceArrays的功能,但减少了间接级别。 另外,锁操作中对表元素和条目“next”字段的volatile写操作使用更便宜的“lazySet”形式(通过putOrderedObject),因为这些写操作之后总是会释放锁,以保持表更新的顺序一致性。
历史提示:该类的上一个版本严重依赖于“final”字段,这避免了一些volatile读取,但代价是大量的初始占用空间。 该设计的一些残余(包括强制构造段0)存在以确保串行兼容性。
源码解析
(1)、类定义
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
...
}
类定义没有什么特别之处,就是实现ConcurrentMap接口,在ConcurrentMap定义了几个原子方法
(2)、常量定义
/**
* 该表的默认初始容量,当没有在构造中指定时使用
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 该表的默认加载因子,当没有在构造函数中指定时使用。
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 该表的默认并发级别,当没有在构造函数中指定时使用。
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 最大容量,如果一个较大的值由带参数的构造函数中的任何一个隐式指定,则使用该值。 必须是2的幂<= 1<<30,以确保条目可以使用整数进行索引。
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 每个段表的最小容量。 必须是2的幂,至少为2,以避免在惰性构造后的下一次使用中立即调整大小。
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* 允许的最大段数; 用于绑定构造函数参数。 必须是2的幂,小于1 << 24。
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative 略显保守
/**
* 在诉诸锁定之前,在大小和containsValue方法上的非同步重试次数。 这用于避免在表经历连续修改时进行无界重试,这将导致无法获得准确的结果。
*/
static final int RETRIES_BEFORE_LOCK = 2;
(3)、字段定义
//与此实例关联的随机值,应用于键的散列代码,使散列冲突更难找到。
private transient final int hashSeed = randomHashSeed(this);
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
* 索引段的掩码值。 键的哈希码的上位用于选择段。
*/
final int segmentMask;
/**
* 在段内移位索引值。
*/
final int segmentShift;
/**
* 每个段都是一个专门的哈希表。用于存放真实数据
*/
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
(4)、内部类HashEntry
/**
* ConcurrentHashMap列表条目。 请注意,它从未作为用户可见的Map.Entry导出。
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* 用volatile写语义设置下一个字段。 (请参阅上面关于putOrderedObject的使用。)
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
(6)、构造方法
/**
* 创建一个新的空映射,带有指定的初始容量,以及默认的负载因子(0.75)和concurrencyLevel(16)。
*/
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* 创建一个新的空映射,具有默认的初始容量(16)、负载因子(0.75)和并发级别(16)。
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* 创建具有与给定映射相同映射的新映射。 创建的映射的容量是给定映射的1.5倍或16(哪个更大),以及默认的负载因子(0.75)和concurrencyLevel (16)
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
/**
* 使用指定的初始容量、负载因子和并发级别创建一个新的空映射。
*
* @param initialCapacity 最初的能力。 该实现执行内部大小调整以适应这么多元素。
* @param loadFactor 负载因子阈值,用于控制调整容量大小。当元素的平均数量超过这个阈值时,可以执行调整大小。
* @param concurrencyLevel 并发更新线程的估计数目。 该实现执行内部大小调整以尝试容纳这么多线程。
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//参数合法性检查
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//并发级别如果操作最大允许的段,则强制设置为最大允许的段大小
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments 找到两倍大小的最佳匹配参数
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
(7)、内部类Segment
段是哈希表的特殊版本。 这个子类取巧地继承了ReentrantLock,只是为了简化一些锁并避免单独构造。
段维护一个始终保持一致状态的条目列表表,所以可以在不锁定的情况下读取(通过对段和表的volatile读取)。 这需要在调整表大小时复制节点,因此仍然使用旧版本表的读者可以遍历旧列表。
该类只定义需要锁定的可变方法。 除了前面提到的,这个类的方法执行ConcurrentHashMap方法的每段版本。 (其他方法直接集成到ConcurrentHashMap方法中。) 这些可变的方法通过scanAndLock和scanAndLockForPut方法在争用时使用一种控制旋转的形式。 这些函数将trylock与遍历穿插在一起以定位节点。 它的主要好处是在获取锁时吸收缓存丢失(这在散列表中很常见),这样一旦获得锁,遍历就会更快。 我们实际上并不使用找到的节点,因为它们必须在锁定状态下重新获取,以确保更新的顺序一致性(在任何情况下都可能无法检测到过时),但它们通常会更快地重新定位。 此外,如果没有找到节点,scanAndLockForPut投机性地创建一个新的节点用于put。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* 预扫描中在可能阻塞之前尝试锁定的最大次数,为锁定段操作做准备。 在多处理器上,使用有限的重试次数来维护定位节点时获得的缓存。
*/
static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每段的hash表。 通过提供volatile语义的entryAt/setEntryAt访问元素。
*/
transient volatile HashEntry<K,V>[] table;
/**
* 元素的数量。 只能在锁中或在其他保持可见性的volatile读取中访问。
*/
transient int count;
/**
* 在这个段中发生变化的操作的总数。 尽管这可能会溢出32位,但它为CHM isEmpty()和size()方法中的稳定性检查提供了足够的准确性。
* 只能在锁中或在其他保持可见性的volatile读取中访问。
*/
transient int modCount;
/**
* 当表的大小超过此阈值时,表将重新散列。 (该字段的值总是(int)(capacity * loadFactor)。)
*/
transient int threshold;
/**
* 哈希表的负载因子。 即使这个值对于所有的段都是相同的,它也会被复制以避免需要连接到外部对象。
*/
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
....
}
从上面可以看出分段,这里面的段 其实是一个个锁。
(8)、get方法
/**
* 返回指定键映射到的值,如果这个映射不包含该键的映射,则返回{@code null}。
*/
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
//定位段地址
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//通过地址获取内存可见的分段
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
(9)、put方法
/**
* 将指定的键映射到该表中的指定值。 键和值都不能为空。
*
* 可以通过调用get方法来检索该值,该方法使用的键等于原始键。
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return 前一个值与键关联,或null(如果键没有映射的话)
* @throws NullPointerException 如果指定的键或值为空
*/
public V put(K key, V value) {
Segment<K,V> s;
//如果值为空则抛出异常
if (value == null)
throw new NullPointerException();
//如果值为空,该方法也会抛出异常
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
// nonvolatile; recheck
if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null)
// in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
调用分段的put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//对当前段加锁;如果成功,则进行进行后续操作;否则,调用scanAndLockForPut方法进行扫描
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//判断节点是否key和hash相等,如果是,则进行value替换
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果不存在该Key的映射,则在链表头部插入一个新节点
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//如果达到阈值,则扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//将新节点放入tab对于的槽位
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
调用scanAndLockForPut方法,进行自旋
/**
* 当试图获取锁时,扫描包含给定键的节点,如果返回,创建并返回一个键,确保锁被持有。
* 与大多数方法不同,对equals方法的调用不会被筛选:由于遍历速度无关紧要,我们还可以帮助预热相关的代码和访问。
*
* @return a new node if key not found, else null
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//获取hash对应槽位的第一个节点
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // 定位节点时为负
//尝试获取锁
while (!tryLock()) {
//获取失败
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
//如果该槽位上的第一个节点为空,则新建一个节点
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
//如果该槽位上的第一个节点非空,且key和当前put的相同
else if (key.equals(e.key))
retries = 0;
//否则,继续遍历
else
e = e.next;
}
//如果达到尝试获取锁的次数达到最大允许尝试次数,则直接加锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
(10)、remove方法
/**
* 从映射中移除该键(及其对应的值)。 如果键不在映射中,此方法将不执行任何操作。
*
* @param key the key that needs to be removed
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key is null
*/
public V remove(Object key) {
int hash = hash(key);
//通过hash定位到segment
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
调用segment.remove方法
/**
* 删除; 仅当value为null时匹配key,否则两者都匹配。
*/
final V remove(Object key, int hash, Object value) {
//尝试加锁
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
/**
* 当试图获取删除或替换操作的锁时,扫描包含给定键的节点。 返回时,保证锁被持有。
* 注意,即使没有找到键,我们也必须锁定,以确保更新的顺序一致性。
*/
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
数据结构
总结:
(1)、ConcurrentHashMap1.7底层数据结构主要是分段锁+数组+链表; 分段锁Segment通过继承ReentrantLock来实现;
(2)、进行put、remove等操作时,它会将hash值对于的段进行加锁,然后进行相应的操作。加锁的时候,刚开始会实行尝试获取自旋,然后超过最大尝试次数后,直接加锁。
(3)、另外,底层实现上使用volatile,sun.misc.Unsafe的getObjectVolatile()、putOrderedObject()、getObject()等等一些底层方法来实现。关于Unsafe这一块的知识,后期再进行补充!