首页 > 代码库 > 深入ConcurrrentHashMap三
深入ConcurrrentHashMap三
这篇主要分析在put元素的时候,需要扩容这时ConcurrentHashMap是如何做的。
先介绍下ConcurrentHashMap的主要思路,步骤如下:
1.在put的时候,如果需要新建HashEntry结点插入到HashEntry中时,这时候如果满足扩容条件则进入下面第2步
2.扩容调用的是Segment的rehash方法,只对某个segment扩容。这时需要新建一个segment的HashEntry数组,大小为原来2倍大。
3.然后对原有HashEntry数组中的元素进行重hash,ConcurrentHashMap这里做了优化。
即如果原有的在HashEntry数组中某一个HashEntry链,如果里面的元素,比如有三个元素a->b->c。这里如果重hash后,a,b,c还是在新数组的相同下标时,可以
想办法重用这几个结点。
下面举例来说明,假设现在一个ConcurrentHashMap,总容量为16,有四个Segment,每个Segment中HashEntry数组大小为4,每个segment的threshold扩容阀值为3。
现在其中一个Segment1的HashEntry存储情况如下:
这里可以看到segment1中HashEntry数组已经使用大小为3。其中index0位置的HashEntry因为有hash冲突所以以链式来解决冲突,这里存储了a,b,c,d,e元素。
index1及index2的hashEntry分别存储了h1,f1两个元素。
这里需要说明的是在做rehash的时候,需要保证尽量不让reader线程受影响。
rehash的时候会先创建一个大小为原来两倍的新数组,然后对之前所有元素进行重hash,这个过程中所有的元素重hash后放入新数组位置都是通过新建一个HashEntry来做,以最小化影响之前的reader线程。
这里假设要put进一个元素到segment1中,key值为p,value为pv。并且这个元素被hash到HashEntry数组的index3位置上,因此需要重建一个HashEntry结点。
这里会先将当前segment1的count值加1,然后判断是否该扩容,这里由于count加1后为4,所以它已经大于上述讲到的threshold阀值,因此需要进行扩容。
扩容时会创建一个大小为8的HashEntry数组,然后分别对原有HashEntry数组中的每个元素进行重hash到新数组中。
以分析segment1的index0的HashEntry链为例。可以看到有元素: a->b->c->d->e
最简单粗暴的重hash方法是:
1.首先对a重hash,这里假设最终a要存储到新数组位置index4中,因此要创建一个HashEntry用于存储key a及相应value。它的next指针指向新数组index4上的首个
HashEntry,这里为null。
2.然后对b进行重hash,假设还是放在新数组位置index4中。这时创建一个新HashEntry,并将它的next指针指向index4的首结点,这里为a。
这步完成后,index4中元素情况如下:
b->a
3.依次处理完c,d,e。假设c,d,e都还是在新数组index4中。
有没有发现a,b,c,d,e,还是被分配在新数组同个位置。因此对它们的创建及重新指向是没有必要的。
事实上ConcurrentHashMap会有一个指针。
在这个指针指向的结点之前都是认为它们存储在新数组不同位置,因此对于这部分结点还是要新建HashEntry结点及重新调整next指针。
在这个指针结点之后,则被认为是被分配在新数组相同位置,因此它们这部分链条结点可以重用。
我们举例说明,假设a,b,c结点它们最终放在新数组不同位置,而d和e放在新数组相同位置。
因此对于d和e只需做以下动作:
新数组[i]=d
即这里直接将新数组下标i的引用指向了元素d。
能这样做的原因是,rehash后由于扩容后新数组大小为原来2倍。因此对于每个元素在新数组的位置,要么是在原先位置,要么是原先位置加上原先大小.
这里d,e在原先的位置0上,扩容后在新数组的位置4上。之后对于原有数组的下标1,2进行rehash时,它们不可能在被放入到新数组的位置0中。
rehash源码如下:
private void rehash(HashEntry<K,V> node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }