首页 > 代码库 > ConcurrentHashMap的关键部分解析

ConcurrentHashMap的关键部分解析

ConcurrentHashMap是线程安全且高效的HashMap,在多线程情况下,由于HashMap是非线程安全的,容易引起死循环,具体的原因可以自己google一下,这里不做解释,这里只从大的方向和几个关键的方法做解释。

这里简单说一句,HashTable虽然解决了线程安全问题,但是由于其低效性,所以不推荐使用,并且它已经是一个过时的家伙了,所以干脆就不要考虑了,其之所以低效,就是锁的粒度太大了。

下面开始分析关键方法和一些参数的含义。

首先看一下ConcurrentHashMap的大体框架,我大概画了一个简易的关系类图,如下图所示。



可以先不负责任的说ConcurrentHashMap之所以高效是因为它更好的降低了锁的粒度。这个图可以这样理解,ConcurrentHashMap用于存储东西的仓储是Segment数组: final Segment<K,V>[] segments;,每个Segment中的仓储又是HashEntry数组: transient volatile HashEntry<K,V>[] table。我们存储的每个key-value键值对对应一个HashEntry。

首先来解决比较难懂的哈希问题,这里从初始化说起吧,这样觉得更容易懂。

既然知道ConcurrentHashMap的优点是锁的粒度减小了,那么锁加在哪里了呢?从图上就可以显然的看出,锁加在了每个Segment上,Segment数组是大的仓储,那么对于添加一个key-value来说,要先找到要添加到的segment.,那么问题就都应该围绕着这个来展开。

看源码中的构造方法:

这里给出解释:

参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小

concurrencyLevel默认是DEFAULT_CONCURRENCY_LEVEL = 16;

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

在这里来认识两个重要的域:segmentShift和segmentMask分别表示段偏移量和段掩码,这两个和segment的定位比较相关。

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;//表示偏移位数
        int ssize = 1;//表示segments数组的长度
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;

从这里可以看出segments数组的长度是由concurrencyLevel决定的,由于要通过按位与的方法来定位segment的位置,那么必须保证segments的大小是2的整数次幂(power-of -two size),concurrencyLevel默认是16,那么sshift默认就是4即左移的次数,ssize为16。再啰嗦一点,默认有16个线程同时修改,那么我就默认的将segments大小设置为16,最理想情况下每个线程对应一个segment这样就不冲突了。再啰嗦就是用户如果指定concurrencyLevel为13,14,ssize都会是16,就是一定要保证大小是不小于concurrencyLevel的最小的2的整数次幂。

注意: static final int MAX_SEGMENTS = 1 << 16; // slightly conservative   这就是说concurrencyLevel最大值是65535,对应二进制是16位

再看下面这段:

  int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)//不满的情况
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;

initialCapacity是总容量,ssize知道了是segments数组的大小,那么c显然就是每个segment的大小了(这不严谨),实际上cap才是每个segment的容量大小,每个容量都是2的整数次幂。最少也是2

下面看这个比较难懂的hash函数:

 private int hash(Object k) {
        int h = hashSeed;
        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }
        h ^= k.hashCode();//首先先得到初始的hash

       //利用h进行再哈希
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

那么为什么要这么做呢,就是为了减小冲突,使得元素能够均匀的分布在不同的segment上,知道了上面的就可以很好的进行解释了。

当调用put方法时

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value =http://www.mamicode.com/= null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;//这里就是在找下标啊
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

这里给举例解释一下(都是在默认情况下,concurrencyLevel=16,ssize=16,sshift=4)那么    this.segmentShift = 32 - sshift=28 而this.segmentMask = ssize - 1=15二进制就是1111;

如果对于给定的object原来的值分别是15,31,63,127(即  h ^= k.hashCode();//首先先得到初始的hash)

如果不采用再hash操作,那么经过  int j = (hash >>> segmentShift) & segmentMask操作,得到的都是15

经过再hash后得到的hash值分别是:


那么经过移位和按位与操作后得到的分别是,4,15,7,8没有产生冲突。这回应该明白segmentShift和segmentMask的作用了吧。

以上明白了,那么get,put和size方法就容易明白了

首先说get,get方法并没有调用锁的痕迹,那么是如何实现线程安全的呢?这就得益于volatile关键字,保证了共享的东西的可见性,这里不做详细的解释,图中已经把关键的域表示出来了,自己仔细看看源码吧。

对于put方法来说,也是先定位,再判断是否需要扩充容量,最后插入。这里可以看看和HashMap的不同(提示,二者何时扩充不同,一前一后)

对于size方法来说,这里有些东西还是很值得借鉴的。虽然每个segment都有一个count值表示个个大小,仅仅累加count是不能解决问题的。虽然当时获得的count是最新的但在加的过程中又可能有变化,而仅仅为了得到size就把每个segment的put、remove,clean方法都锁上,代价未免也太大了。因为在累加的过程中count发生变化的几率比较小,那么这里就有一个小的trick,先通过尝试2次不通过锁住segment的方法获取每个大小,如果在统计过程中,count发生变化了,那么再通过加锁的方式进行统计。

那么如何判断是否发生变化了呢,这里modcount就是关键了,每次如put,remove操作都会对modcount进行加1操作,那么只需比较调用size方法之前和之后的modconut大小就知道了。这里值得借鉴啊。

关于ConcurrentHashMap就介绍到这里,有点混乱,说的不是很清晰,但觉得关键的部分说出来了,可以自己再看看源码分析一下,之所以分析一下,是因为在看Ehcache源码的时候,它就是采用和这个类似的类作为仓库的。个人分析可能有误,如有错误请指正。