ConcurrentHashMap
JDK1.7 中的ConcurrentHashMap
1. 数据结构
如图所示,是由 Segment
数组、HashEntry
组成,和 HashMap
一样,仍然是数组加链表。
它的核心成员变量:
1 2 3 4 5 6 final Segment<K,V>[] segments;transient Set<K> keySet;transient Set<Map.Entry<K,V>> entrySet;
Segment
是 ConcurrentHashMap
的一个内部类,主要的组成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static final class Segment <K ,V > extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L ; transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; }
看看其中 HashEntry
的组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 static final class HashEntry <K ,V > { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this .hash = hash; this .key = key; this .value = value; this .next = next; } }
和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile
修饰的,保证了获取时的可见性。
原理上来说:ConcurrentHashMap
采用了分段锁技术 ,其中 Segment
继承于 ReentrantLock
。不会像 HashTable
那样不管是 put
还是 get
操作都需要做同步处理,理论上 ConcurrentHashMap
支持 CurrencyLevel
(Segment
数组数量)的线程并发。每当一个线程占用锁访问一个 Segment
时,不会影响到其他的 Segment
。
2. 初始化方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } segmentShift = 32 - sshift; segmentMask = ssize - 1 ; this .segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1 ; while (cap < c) cap <<= 1 ; for (int i = 0 ; i < this .segments.length; ++i) this .segments[i] = new Segment<K,V>(cap, loadFactor); }
需要注意的是,concurrencyLevel
一经指定,便不能再次改变 ,原因也很简单,简化元素增多时的rehash
过程,若Segment
的数量也随元素的增加而进行扩容,则需要进行两次rehash
,需要处理全部元素,效率较低。
随着元素的增加,ConcurrentHashMap
不会增加Segment
的数量,而只会增加Segment
中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap
做rehash
,而只需要对Segment
里面的元素做一次rehash
就可以了。
3. put方法
1 2 3 4 5 6 7 8 9 10 11 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); }
首先是通过 key
定位到 Segment
,之后在对应的 Segment
中进行具体的 put
:
1 2 3 4 5 6 7 8 9 10 11 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { ... } finally { unlock(); } return oldValue; }
虽然 HashEntry
中的 value 是用 volatile
关键词修饰的,但是并不能保证并发的原子性 ,所以 put
操作时仍然需要加锁处理 。
首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut()
自旋获取锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private HashEntry<K,V> scanAndLockForPut (K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int retries = -1 ; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0 ) { if (e == null ) { if (node == null ) node = new HashEntry<K,V>(hash, key, value, null ); retries = 0 ; } else if (key.equals(e.key)) retries = 0 ; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else if ((retries & 1 ) == 0 && (f = entryForHash(this , hash)) != first) { e = first = f; retries = -1 ; } } return node; }
尝试自旋获取锁 。
如果重试的次数达到了 MAX_SCAN_RETRIES
则改为阻塞锁 获取,保证能获取成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
再结合源码看看 put
的整体流程:
map
的put
方法就做了三件事情:找出segments
的位置;判断当前位置有没有初始化,没有就调用ensureSegment()
方法初始化;然后调用segment
的put
方法;
segment
的put
方法.,获取当前segment
的锁,成功接着执行,失败调用scanAndLockForPut
方法自旋获取锁,成功后也是接着往下执行;
插入操作:
通过hash
计算出位置,获取节点,找出相同的key
和hash
替换value
,返回;
没有找到相同的,设置创建节点前,判断是否需要扩容,需要调用扩容方法rehash()
;
不需要,设置找出的节点为当前创建节点的next
节点;
返回,释放锁。
4. get方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public V get (Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null ) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e != null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null ; }
get
逻辑比较简单:
只需要将 Key 通过 Hash 之后定位到具体的 Segment ;
再通过一次 Hash 定位到具体的元素上。
get
没有加锁,效率高
注意: get
方法使用了getObjectVolatile
方法读取segment
和hashentry
,保证是最新的,具有锁的语义,可见性
**分析:**为什么get
不加锁可以保证线程安全:
首先获取value
,我们要先定位到segment
,使用了UNSAFE
的getObjectVolatile
具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment
;
获取hashentry[]
,由于table是每个segment
内部的成员变量,使用volatile
修饰的,所以我们也能获取最新的table;
然后我们获取具体的hashentry
,也时使用了UNSAFE
的getObjectVolatile
具有读的volatile语义,然后遍历查找返回;
总结我们发现怎个get
过程中使用了大量的volatile
关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get
只是读取操作,所以我们只需要保证读取的是最新的数据即可。
5. rehash方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private void rehash (HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1 ; threshold = (int )(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1 ; for (int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null ) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null ) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null ; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
6. size 方法——统计元素数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public int size () { final Segment<K,V>[] segments = this .segments; int size; boolean overflow; long sum; long last = 0L ; int retries = -1 ; try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L ; size = 0 ; overflow = false ; for (int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 ) overflow = true ; } } if (sum == last) break ; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
size
尝试3次不加锁获取sum
,如果发生变化就全部加锁,size
和containsValue
方法的思想也是基本类似。
【执行流程】
第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum;
第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环;
第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环;
第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size;(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞)。
JDK1.8 中的 ConcurrentHashMap
1. 数据结构
看起来是不是和 1.8 HashMap 结构类似?
其中抛弃了ConcurrentHashMap
原有的 Segment
分段锁,而采用了 CAS + synchronized
来保证并发安全性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 private static final int MAXIMUM_CAPACITY = 1 << 30 ; private static final int DEFAULT_CAPACITY = 16 ; static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ; private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ; private static final float LOAD_FACTOR = 0.75f ; static final int TREEIFY_THRESHOLD = 8 ; static final int UNTREEIFY_THRESHOLD = 6 ; static final int MIN_TREEIFY_CAPACITY = 64 ; private static final int MIN_TRANSFER_STRIDE = 16 ; private static int RESIZE_STAMP_BITS = 16 ; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 ; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ; static final int HASH_BITS = 0x7fffffff ; static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final ObjectStreamField[] serialPersistentFields = { new ObjectStreamField("segments" , Segment[].class), new ObjectStreamField("segmentMask" , Integer.TYPE), new ObjectStreamField("segmentShift" , Integer.TYPE) }; transient volatile Node<K,V>[] table; private transient volatile Node<K,V>[] nextTable; private transient volatile long baseCount; private transient volatile int sizeCtl; private transient volatile int transferIndex; private transient volatile int cellsBusy; private transient volatile CounterCell[] counterCells; static class Node <K ,V > implements Map .Entry <K ,V > { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this .hash = hash; this .key = key; this .val = val; this .next = next; } public final K getKey () { return key; } public final V getValue () { return val; } public final int hashCode () { return key.hashCode() ^ val.hashCode(); } public final String toString () { return key + "=" + val; } public final V setValue (V value) { throw new UnsupportedOperationException();} public final boolean equals (Object o) {...} Node<K,V> find (int h, Object k) {...} }
也将 1.7 中存放数据的 HashEntry
改为 Node
,但作用都是相同的。
其中的 val next
都用了volatile
修饰,保证了可见性。
2. putVal方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
根据 key 计算出 hashcode ;
判断table
是否需要进行初始化;
f
即为当前key
定位出的 Node
,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功;
如果当前位置的 hashcode == MOVED == -1
,则帮助进行扩容;
如果都不满足,则利用 synchronized
锁写入数据,此处分为红黑树的插入,链表插入两种情况;
如果bincount
大于 TREEIFY_THRESHOLD
则要转换为红黑树。
在putVal
函数中会涉及到如下几个函数:initTable
、tabAt
、casTabAt
、helpTransfer
、putTreeVal
、treeifyBin
、addCount
函数。下面对其中涉及到的函数进行分析。
3. initTable 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
**说明:**对于table
的大小,会根据sizeCtl
的值进行设置,如果没有设置szieCtl
的值,那么默认生成的table
大小为16,否则,会根据sizeCtl
的大小设置table
大小。
4. tabAt 函数
1 2 3 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); }
**说明:**此函数返回table
数组中下标为i的结点,可以看到是通过Unsafe
对象通过反射获取的,getObjectVolatile
的第二项参数为下标为i
的偏移地址。
5. casTabAt 函数
1 2 3 4 static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long )i << ASHIFT) + ABASE, c, v); }
**说明:**此函数用于比较table
数组下标为i的结点是否为c
,若为c
,则用v
交换操作。否则,不进行交换操作。
6. helpeTransfer 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
说明:此函数用于在扩容时将table
表中的结点转移到nextTable
中。
7. treeifyBin函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private final void treeifyBin (Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null ) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1 ); else if ((b = tabAt(tab, index)) != null && b.hash >= 0 ) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null , tl = null ; for (Node<K,V> e = b; e != null ; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null , null ); if ((p.prev = tl) == null ) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
说明:此函数用于将桶中的 数据结构转化为红黑树 ,其中,值得注意的是,当table
的长度未达到阈值时,会进行一次扩容操作 ,该操作会使得触发treeifyBin
操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。
8. get方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
如果是红黑树那就按照树的方式获取值。
就不满足那就按照链表的方式遍历获取值。
9. sumCount 函数
1 2 3 4 5 6 7 8 9 10 11 final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
两个版本 ConcurrentHashMap 的比较
数据结构与存取实现
其实可以看出JDK1.8
版本的ConcurrentHashMap
的数据结构已经接近HashMap
,相对而言,ConcurrentHashMap
只是增加了同步的操作来控制并发,从JDK1.7
版本的ReentrantLock+Segment+HashEntry
,到JDK1.8
版本中synchronized+CAS+Node+红黑树
,相对而言,总结如下思考:
JDK1.8
的实现降低锁的粒度 ,JDK1.7
版本锁的粒度是基于Segment
的,包含多个HashEntry
,而JDK1.8
锁的粒度就是Node
(首节点);
JDK1.8
版本的数据结构变得更加简单,使得操作也更加清晰流畅 ,因为已经使用synchronized
来进行同步,所以不需要分段锁的概念,也就不需要Segment
这种数据结构了,由于粒度的降低,实现的复杂度也增加了;
JDK1.8
使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档;
JDK1.8
为什么使用内置锁synchronized
来代替重入锁ReentrantLock
,我觉得有以下几点:
因为粒度降低了,在相对而言的低粒度加锁方式,synchronized
并不比ReentrantLock
差 ,在粗粒度加锁中ReentrantLock
可能通过Condition
来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition
的优势就没有了;
JVM的开发团队从来都没有放弃synchronized
,而且基于JVM的synchronized
优化空间更大,使用内嵌的关键字比使用API更加自然
在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock
会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据
1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)
),甚至取消了 ReentrantLock
改为了 synchronized
,这样可以看出在新版的 JDK 中对 synchronized
优化是很到位的。
对象统计
JDK1.7:统计每个segment
对象中的元素个数,然后进行累加,但是这种方式计算出来的结果不一定准确。因为在计算后面的segment
的元素个数时,前面计算过了的segment可能有数据的新增或删除。
前面计算过了的segment
可能有数据的新增或删除
计算方式为:先采用不加锁的方式,连续计算两次
计算方式为:先采用不加锁的方式,连续计算两次
如果两次结果相等,说明计算结果准确
如果两次结果不相等,说明计算过程中出现了并发新增或者删除操作
于是给每个segment
加锁,然后再次计算
JDK1.8:使用一个volatile
类型的变量baseCount
记录元素的个数,同时部分元素的变化个数保存在CounterCell
数组中,通过累加baseCount
和CounterCell
数组中的数量,即可得到元素的总个数;
在并发量很高时,如果存在两个线程同时执行CAS修改baseCount
值,则失败的线程会继续执行方法体中的逻辑,执行fullAddCount(x, uncontended)
方法,这个方法其实就是初始化counterCells
,并将x的值插入到counterCell类中。
所以counterCells
存储的都是value
为1的CounterCell
对象,而这些对象是因为在CAS更新baseCounter
值时,由于高并发而导致失败,最终将值保存到CounterCell
中,放到counterCells
里。这也就是为什么sumCount()
中需要遍历counterCells
数组,sum
累加CounterCell.value
值了。