首页 > 代码库 > 深入ConcurrentHashMap一

深入ConcurrentHashMap一

ConcurrentHashMap能够做到比较高性能的并发访问。
ConcurrentHashMap内部有一个Segment数组,每个Segment有一个lock。
Segment相当于是一个子map,拥有一个HashEntity数组。
这样可以将并发压力分摊到多个Segment上。

ConcurrentHashMap组成图:


这里的Segment个数必须为2的n次方,为了之后高效计算要存放的key存放在哪个Segment上(用  << 实现)。

Segment内部拥有一个类型为volatile的HashEntry数组,这是为了能够让其它线程看到最新的值。

ConcurrentHashMap拥有put,get,remove等操作。而原有的扩容操作,为了性能则只针对单个Segment来进行。


关键的Segment相当了ConcurrentHashMap中的子map,它包含了一个HashEntry数组。它的类图如下:

这里关键的Segment类图如下:



HashEntry内部存储了hash值,final的key值,及volatile的value及next值。

这里value及next值是volatile是为实现get操作的不需要加锁提供了基础。

HashEntry类图如下:



下面分别从ConcurrentHashMap创建,往ConcurrentHashMap放入元素,从ConcurrentHashMap取出元素来进行分析。

一.ConcurrentHashMap创建

在用ConcurrentHashMap时,可以在构造函数中传入你想要的map容量,loadFactor装载因子,并发数(大致决定了有几个Segment)。
如果不传入,使用空构造函数时,默认的map容量大小为16,loadFactor为0.75,并发数为16。
我们以默认的参数开始分析。
首先需要确定总容量大小(这里指所有Segment中存放数组元素的数组大小总和),Segment总数,以及每个Segment中HashEntry数组的大小。
我们先附上代码:
 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
 }


可以看到,对于Segment个数ssize取的是以concurrencyLevel为上界的2的n次方,这里由于concurrencyLevel为16,所以ssize也为16。
然后计算segmentShift,这个值用于后续在存放元素到ConcurrentHashMap时,用于决定有多少位hash高位参与计算存放元素的Segment数组下标。
这里segmentShift为32-sshift。sshift取的是Segment个数ssize的2的n次数的n值,即在计算ssize时左移的次数,这里为4。
所以segmentShift为28。
接着计算出segmentMask,这个值用于在计算得到存放元素的Segment所在数组的下标,这里为了效率使用 & 操作来替代%模操作。
这里由于Segment个数为16,所以segmentMask为15。
之后会计算出每个Segment的数组容量,这里先计算出每个Segment的大致大小,即用int c = initialCapacity / ssize; 来得出c,这里c为1。
之后会初始化一个cap作为真正的segment中HashEntry数组大小,将它初始化为2。
然后为了保证得出的segMent中HashEntry的大小为2的n次方,所以后续会对cap做以下操作:
while (cap < c)
  cap <<= 1;
即cap的值要不是2(c值小于等于2时),或者是大于2的2的n次方。
得到最终每个Segment中HashEntry数组大小后,创建一个Segment数组,并在Segment数组0处初始化创建一个Segment。

最后将ConcurrentHashMap的segments设置为新创建的Segments数组。


这里可以看到最后将创建的Segment s0是调用UNSAFE.putOrderedObject(ss,SBASE,s0)来将其放入到ss数组中。

这里的UNSAFE.putOrderedObject是JAVA提供的用于直接操作内存的方法,其中参数ss是数组。

UNSAFE.putOrderedObject会延迟更新到内存中,但是由于后续在获取segment数组中的segment时,采用的是UNSAFE.getObjectVolatile,所以能够保证对于segment放入数组的操作对于后续的线程是可见的。

SBASE是ConcurrentHashMap的一个静态final long的值,相当于是Segment数组的首地址。

s0则是创建的需要放入ss数组中的segment实例。这里是将s0放入到ss数组位置0中。

如果提供类似:UNSAFE.putOrderedObject(ss,SBASE+offset,s)表明将元素s放入到数组ss的SBASE+offset位置。

其中这里的offset:

offset=步长*要放在数组第几位

步长一般是在编译期已经确定,这里测试过对于:Double或者Integer等,步长都是4。