首页 > 代码库 > ConcurrentHashMap源码分析
ConcurrentHashMap源码分析
一、ConcurrentHashMap出现的原因
我们之前学过HashMap,也知道HashMap不是线程安全的,在多线程环境下,HashMap的put方法有可能引起死循环。于是HashTable这个类出现,它在大量的方法前都加了内置锁Synchronized,这就保证了它的线程安全性,但是这种方法太极端,导致效率低下。当一个线程访问了HashTable的同步方法时,其它线程就只能等待该线程释放锁。在这种情况,针对多线程的情况,ConcurrentHashMap应运而生。
二、ConcurrentHashMap的数据结构
ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问
(图片来自:http://www.cnblogs.com/dolphin0520/p/3920373.html)
和HashMap不同之处是,HashMap是使用一个数组来连接各个Entry链,而ConcurrentHashMap则是使用了Segment数组(继承ReentrantLock)来链接各个HashEntry数组,然后各个HashEntry数组中连接各自的HashEntry链。(这个要很注意)每一个Segment都有一个锁,所以这可以达到并发得访问各个Segment的Entry。
三、ConcurrentHashMap的定义
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
ConcurrentHashMap继承自AbstractMap,实现了ConcurrentMap接口,使得它具有Map的属性,同时又有多线程相关的属性
ConcurrentHashMap的成员变量:
//初始的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//初始的加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//初始的并发等级(下面会叙述作用)
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的segment数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大的segment数量
static final int MAX_SEGMENTS = 1 << 16;
//
static final int RETRIES_BEFORE_LOCK = 2;
四、ConcurrentHashMap的构造函数
//通过指定的容量,加载因子和并发等级创建一个新的ConcurrentHashMap
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//对容量,加载因子和并发等级做限制
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//限制并发等级不可以大于最大等级
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 下面即通过并发等级来确定Segment的大小
//sshift用来记录向左按位移动的次数
int sshift = 0;
//ssize用来记录Segment数组的大小
int ssize = 1;
//Segment的大小为大于等于concurrencyLevel的第一个2的n次方的数
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
//segmentMask的值等于ssize - 1(这个值很重要)
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//c记录每个Segment上要放置多少个元素
int c = initialCapacity / ssize;
//假如有余数,则Segment数量加1
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//创建第一个Segment,并放入Segment[]数组中,作为第一个Segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
对于容量和加载因子:我在HashMap那篇文章已经讲解得很清楚: Java容器(四):HashMap(Java 7)的实现原理
并发等级(concurrencyLevel):用来确定Segment的数量,Segment的个数为大于等于concurrencyLevel的 第一个 2的n次方的数,例如当concurrencyLevel为12,13,14,15,16时,此时的Segment的数量为16
segmentMask:这个为什么要为Segment数组的长度 -1, 这个也在HashMap中讲过了,主要是为了让低位为1,这样在做&运算确定Segment的索引时能够更加分散
五、ConcurrentHashMap的put操作
public V put(K key, V value) {
Segment<K,V> s;
//ConcurrentHashMap的key和value都不能为null
if (value == null)
throw new NullPointerException();
//这里对key求hash值,并确定应该放到segment数组的索引位置
int hash = hash(key);
//j为索引位置,思路和HashMap的思路一样,这里不再多说
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//这里很关键,找到了对应的Segment,则把元素放到Segment中去
return s.put(key, hash, value, false);
}
得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:
0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。
其实大体和HashMap相似
下面看看具体如何插入到Segment中的
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//这里是并发的关键,每一个Segment进行put时,都会加锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//tab是当前segment所连接的HashEntry数组
HashEntry<K,V>[] tab = table;
//确定key的hash值所在HashEntry数组的索引位置
int index = (tab.length - 1) & hash;
//取得要放入的HashEntry链的链头
HashEntry<K,V> first = entryAt(tab, index);
//遍历当前HashEntry链
for (HashEntry<K,V> e = first;;) {
//如果链头不为null
if (e != null) {
K k;
//如果在该链中找到相同的key,则用新值替换旧值,并退出循环
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = http://www.mamicode.com/e.value;"hljs-keyword">if (!onlyIfAbsent) {
e.value = http://www.mamicode.com/value;"hljs-keyword">break;
}
//如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else
e = e.next;
}
else {//如果没有找到key相同的,则把当前Entry插入到链头
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
//此时数量+1
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//如果超出了限制,要进行扩容
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = http://www.mamicode.com/null;
break;
}
}
} finally {
//最后释放锁
unlock();
}
return oldValue;
}
我们来重新理一理思路:
1. 首先对key进行第1次hash,通过hash值确定segment的位置
2. 然后在segment内进行操作,获取锁
3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
4. 然后对当前索引的HashEntry链进行遍历,如果有重复的key,则替换;如果没有重复的,则插入到链头
5. 关闭锁
可见,在整个put过程中,进行了2次hash操作,才最终确定key的位置。
五、ConcurrentHashMap的remove操作
public V remove(Object key) {
//求key的hash
int hash = hash(key);
//求得hash对应的Segment
Segment<K,V> s = segmentForHash(hash);
//在segment内进行删除
return s == null ? null : s.remove(key, hash, null);
}
final V remove(Object key, int hash, Object value) {
//获取锁
if (!tryLock())
scanAndLock(key, hash);
V oldValue = http://www.mamicode.com/null;
try {
//tab是当前segment所连接的HashEntry数组
HashEntry<K,V>[] tab = table;
//确定key的hash值所在HashEntry数组的索引位置
int index = (tab.length - 1) & hash;
//取得要放入的HashEntry链的链头
HashEntry<K,V> e = entryAt(tab, index);
//pred用来记录待删除节点的前一个节点
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
//当找到了待删除及节点的位置
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
//如果待删除节点的前节点为null,即待删除节点时链头节点,此时把该位置指向第2个结点就行了
if (pred == null)
setEntryAt(tab, index, next);
//如果有前节点,则待删除节点的前节点的next指向待删除节点的的下一个节点,删除成功
else
pred.setNext(next);
++modCount;
--count;
oldValue = http://www.mamicode.com/v;"hljs-keyword">break;
}
pred = e;
e = next;
}
} finally {
//最后关闭锁
unlock();
}
return oldValue;
}
我们来理一理思路:
1. 首先对key进行第1次hash,通过hash值确定segment的位置
2. 然后在segment内进行操作,获取锁
3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
4. 用一个HashEntry引用来记录待删除节点的前一个节点,然后删除待删除节点
5. 关闭锁
更多关于ConcurrentHashMap的文章:
Java并发编程:并发容器之ConcurrentHashMap
[Java并发包学习八]深度剖析ConcurrentHashMap
ConcurrentHashMap源码分析