编程不止是一份工作,还是一种乐趣!!!
从名字就能看出来,ConcurrentHashMap是专门为并发场景而设计的,相比HashTable,ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于HashTable对整个Map加锁的设计,分段锁极大的提升了高并发环境下的处理能力。但同时,由于不是对整个Map加锁,也导致一些需要扫描整个Map的方法(如size
)使用了特殊的实现,还有一些方法(比如clear
)甚至放弃了对一致性的要求,ConcurrentHashMap是一弱一致性的。
ConcurrentHashMap内部维护了一个segment数组。每个Segment称为一个分段,并且继承了ReentrantLock,所以ConcurrentHashMap的加锁粒度是基于每个段的。将数据分段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,实现了更好的并发性能。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
final Segment<K,V>[] segments;
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
}
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
...
}
}
Segment类似于HashMap,内部拥有一个HashEntry数组,数组中的每个元素又是一个链表,但ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value被volatile修饰,这样在多线程读写过程中能够保持它的可见性;还有next被修饰为final的,不变(Immutable)和易变(Volatile)使得ConcurrentHashMap允许多个读操作并发进行,并且读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将会是不一致的。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
ConcurrentHashMap的初始化需要提供三个参数。容量initialCapacity与扩容因子loadFactor的含义与HashMap类似,这里比较重要的是第三个参数:并发度concurrencyLevel。并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段的个数,即segment数组的长度。segmentMask的大小为Segment数组长度减1,segmentMask与segmentShift的作用是为了后续计算segment组数下标时使用的。
ConcurrentHashMap默认的并发度为16。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32),原因与HashMap设置容量大小为2的n次方一样,能够更高效的使用“与”操作来定位分段Segment。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。
分段确定好以后,剩下的就是每个分段的初始化了。前面已经提到过,ConcurrentHashMap中的分段Segment其实与HashMap是非常相似的。首先根据initialCapacity与分段数算出每个分段的容量c
,最终用于确认每个分段容量大小的变量cap
也一样为不小于c的最小2幂指数:
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
最后两行循环创建每个分段,这部分基本与HashMap的初始化一样,这里做进一步介绍了。
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
首先根据key对象的hashCode重新计算hash值,根据hash值确定桶的位置(即segment组数的下标)。这里可能会产生一个疑问,为什么要对key对象的hashCode进行二次hash呢?这里其实是加入了高位计算,防止低位不变,高位变化时,造成的hash冲突,简单来说就是降低hash冲突的机率。
将得到的hash值向右按位移动segmentShift位,然后再与segmentMask做“与”运算得到segment组数的下标。 在初始化的时候我们说过segmentShift的值等于32-sshift,例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子: 0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做“与”运算,也就是取最后四位的值,这个值确定Segment的索引,即segment组数的下标。
最后是Segment的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
除了对当前的桶进行加锁之外,Segment的put方法与HashMap的put方法基本是一样的,还有就是在扩容的时候有一点区别:Segment类的rehash方法在新的table初始化完成之前,并不会修改老的table,以保证其它线程能够进行读访问。
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
// Segment类的get方法
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
同样的,get方法先根据key对象的hashCode重新计算hash值,根据hash值确定桶的位置,再把真正的get操作委托给Segment类的get方法。从上面可以看出,ConcurrentHashMap的get操作是不需要加锁的(如果value为null,会调用readValueUnderLock,只有这个步骤会加锁),通过前面提到的volatile和final来确保数据安全。
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
if (check == sum)
break;
}
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}
size操作与put和get操作最大的区别在于,size操作需要遍历所有的Segments才能算出整个Map的大小,而put和get都只关心一个Segment。假设我们在遍历一个Segment时,别的线程有可能会修改其它的Segment,于是这一次计算出来的size值可能并不是Map当前的真正大小。
一个比较简单的办法就是计算Map大小的时候将所有的Segment都Lock住,计算完之后再Unlock。这是普通人能够想到的方案,但是ConcurrentHashMap用了一个很好的点子:先在不加锁的情况下进行两次测试,遍历所有的Segments,累加各个Segment的大小得到整个Map的大小,如果两次测试获取的所有Segment的更新的次数(modCount,每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。如果这两次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segments加锁,再遍历所有Segments计算Map大小,最后再解锁所有Segments。
public void clear() {
for (int i = 0; i < segments.length; ++i)
segments[i].clear();
}
// Segment类的clear方法
void clear() {
if (count != 0) {
lock();
try {
HashEntry<K,V>[] tab = table;
for (int i = 0; i < tab.length ; i++)
tab[i] = null;
++modCount;
count = 0; // write-volatile
} finally {
unlock();
}
}
}
clear方法并没有采用特别的处理,循环处理每个桶,因为没有全局的锁,在清除完一个Segment之后,正在清理下一个Segment的时候,已经清理Segment可能又被加入了数据,因此clear返回的时候,ConcurrentHashMap中是可能存在数据的。因此,clear方法是弱一致的。