首页 > 代码库 > 深入ConcurrentHashMap一
深入ConcurrentHashMap一
ConcurrentHashMap内部有一个Segment数组,每个Segment有一个lock。
Segment相当于是一个子map,拥有一个HashEntity数组。
这样可以将并发压力分摊到多个Segment上。
ConcurrentHashMap组成图:
这里的Segment个数必须为2的n次方,为了之后高效计算要存放的key存放在哪个Segment上(用 << 实现)。
Segment内部拥有一个类型为volatile的HashEntry数组,这是为了能够让其它线程看到最新的值。
ConcurrentHashMap拥有put,get,remove等操作。而原有的扩容操作,为了性能则只针对单个Segment来进行。
关键的Segment相当了ConcurrentHashMap中的子map,它包含了一个HashEntry数组。它的类图如下:
这里关键的Segment类图如下:
HashEntry内部存储了hash值,final的key值,及volatile的value及next值。
这里value及next值是volatile是为实现get操作的不需要加锁提供了基础。
HashEntry类图如下:
下面分别从ConcurrentHashMap创建,往ConcurrentHashMap放入元素,从ConcurrentHashMap取出元素来进行分析。
一.ConcurrentHashMap创建
如果不传入,使用空构造函数时,默认的map容量大小为16,loadFactor为0.75,并发数为16。
我们以默认的参数开始分析。
首先需要确定总容量大小(这里指所有Segment中存放数组元素的数组大小总和),Segment总数,以及每个Segment中HashEntry数组的大小。
我们先附上代码:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
可以看到,对于Segment个数ssize取的是以concurrencyLevel为上界的2的n次方,这里由于concurrencyLevel为16,所以ssize也为16。
然后计算segmentShift,这个值用于后续在存放元素到ConcurrentHashMap时,用于决定有多少位hash高位参与计算存放元素的Segment数组下标。
这里segmentShift为32-sshift。sshift取的是Segment个数ssize的2的n次数的n值,即在计算ssize时左移的次数,这里为4。
所以segmentShift为28。
接着计算出segmentMask,这个值用于在计算得到存放元素的Segment所在数组的下标,这里为了效率使用 & 操作来替代%模操作。
这里由于Segment个数为16,所以segmentMask为15。
之后会计算出每个Segment的数组容量,这里先计算出每个Segment的大致大小,即用int c = initialCapacity / ssize; 来得出c,这里c为1。
之后会初始化一个cap作为真正的segment中HashEntry数组大小,将它初始化为2。
然后为了保证得出的segMent中HashEntry的大小为2的n次方,所以后续会对cap做以下操作:
while (cap < c)
cap <<= 1;
即cap的值要不是2(c值小于等于2时),或者是大于2的2的n次方。
得到最终每个Segment中HashEntry数组大小后,创建一个Segment数组,并在Segment数组0处初始化创建一个Segment。
最后将ConcurrentHashMap的segments设置为新创建的Segments数组。
这里可以看到最后将创建的Segment s0是调用UNSAFE.putOrderedObject(ss,SBASE,s0)来将其放入到ss数组中。
这里的UNSAFE.putOrderedObject是JAVA提供的用于直接操作内存的方法,其中参数ss是数组。
UNSAFE.putOrderedObject会延迟更新到内存中,但是由于后续在获取segment数组中的segment时,采用的是UNSAFE.getObjectVolatile,所以能够保证对于segment放入数组的操作对于后续的线程是可见的。
SBASE是ConcurrentHashMap的一个静态final long的值,相当于是Segment数组的首地址。
s0则是创建的需要放入ss数组中的segment实例。这里是将s0放入到ss数组位置0中。
如果提供类似:UNSAFE.putOrderedObject(ss,SBASE+offset,s)表明将元素s放入到数组ss的SBASE+offset位置。
其中这里的offset:
offset=步长*要放在数组第几位
步长一般是在编译期已经确定,这里测试过对于:Double或者Integer等,步长都是4。