深入理解 ConcurrentHashMap

ConcurrentHashMap

JDK1.7 中的ConcurrentHashMap

1. 数据结构

jdk7

如图所示,是由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。

它的核心成员变量:

1
2
3
4
5
6
/**
* Segment 数组,存放数据时首先需要定位到具体的 Segment 中。
*/
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;

SegmentConcurrentHashMap 的一个内部类,主要的组成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static final class Segment<K,V> extends ReentrantLock implements Serializable {

private static final long serialVersionUID = 2249069246763182397L;

// 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶
transient volatile HashEntry<K,V>[] table; // 链表数组,数组中的每一个元素代表了一个链表的头部

transient int count; // Segment中元素的数量,由volatile修饰,支持内存可见性

transient int modCount; // 对table的大小造成影响的操作的数量(比如put或者remove操作)

transient int threshold; // 扩容阈值

final float loadFactor; // 负载因子

}

看看其中 HashEntry 的组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}

和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。

原理上来说:ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment

2. 初始化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// initialCapacity: 初始容量
// loadFactor: 负载因子
// concurrencyLevel: ConcurrentHashMap内部的Segment的数量
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();

// 若concurrencyLevel大于MAX_SEGMENTS,则concurrencyLevel=MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// 求解concurrencyLevel与2的几次方最近(天花板方向)
// 如concurrencyLevel=5 则天花板方向上离2^3=8最近,则sshift=3,ssize=8
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segmentShift和segmentMask主要用于元素的hash
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
// 可以看到,实际segment的数量为ssize
this.segments = Segment.newArray(ssize);

if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
// 若initialCapacity / ssize不整除,则将c=c+1
if (c * ssize < initialCapacity)
++c;
int cap = 1;
// cap为每个segment的初始容量,其值为离c天花板方向最近的2^n
// 例:c为5,cap则为2^3=8;c为12,cap则为2^4=16
while (cap < c)
cap <<= 1;
// 创建Segment
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

需要注意的是,concurrencyLevel一经指定,便不能再次改变,原因也很简单,简化元素增多时的rehash过程,若Segment的数量也随元素的增加而进行扩容,则需要进行两次rehash,需要处理全部元素,效率较低。

随着元素的增加,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMaprehash,而只需要对Segment里面的元素做一次rehash就可以了。

3. put方法

1
2
3
4
5
6
7
8
9
10
11
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

首先是通过 key 定位到 Segment,之后在对应的 Segment 中进行具体的 put

1
2
3
4
5
6
7
8
9
10
11
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
...
} finally {
unlock();
}
return oldValue;
}

虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,但是并不能保证并发的原子性,所以 put 操作时仍然需要加锁处理

首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
  1. 尝试自旋获取锁
  2. 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value); // 上锁
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash; // 定位hashentry
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) { // 遍历hashentry
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else { // 未找到,新建entry插入
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 判断是否进行扩容
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock(); // 解锁
}
return oldValue;
}

再结合源码看看 put 的整体流程:

  1. mapput方法就做了三件事情:找出segments的位置;判断当前位置有没有初始化,没有就调用ensureSegment()方法初始化;然后调用segmentput方法;
  2. segmentput方法.,获取当前segment的锁,成功接着执行,失败调用scanAndLockForPut方法自旋获取锁,成功后也是接着往下执行;
  3. 插入操作:
    1. 通过hash计算出位置,获取节点,找出相同的keyhash替换value,返回;
    2. 没有找到相同的,设置创建节点前,判断是否需要扩容,需要调用扩容方法rehash()
    3. 不需要,设置找出的节点为当前创建节点的next节点;
    4. 返回,释放锁。

4. get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 获取segment的位置
// getObjectVolatile getObjectVolatile语义读取最新的segment,获取table
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// getObjectVolatile getObjectVolatile语义读取最新的hashEntry,并遍历
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// 找到相同的key 返回
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

get逻辑比较简单:

  • 只需要将 Key 通过 Hash 之后定位到具体的 Segment ;
  • 再通过一次 Hash 定位到具体的元素上。

get 没有加锁,效率高
注意:get方法使用了getObjectVolatile方法读取segmenthashentry,保证是最新的,具有锁的语义,可见性
**分析:**为什么get不加锁可以保证线程安全:

  • 首先获取value,我们要先定位到segment,使用了UNSAFEgetObjectVolatile具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment
  • 获取hashentry[],由于table是每个segment内部的成员变量,使用volatile修饰的,所以我们也能获取最新的table;
  • 然后我们获取具体的hashentry,也时使用了UNSAFEgetObjectVolatile具有读的volatile语义,然后遍历查找返回;
  • 总结我们发现怎个get过程中使用了大量的volatile关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get只是读取操作,所以我们只需要保证读取的是最新的数据即可。

5. rehash方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
*扩容方法
*/
private void rehash(HashEntry<K,V> node) {
// 旧的table
HashEntry<K,V>[] oldTable = table;
// 旧的table的长度
int oldCapacity = oldTable.length;
// 扩容原来capacity的一倍
int newCapacity = oldCapacity << 1;
// 新的阈值
threshold = (int)(newCapacity * loadFactor);
// 新的table
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码
int sizeMask = newCapacity - 1;
// 遍历旧的table
for (int i = 0; i < oldCapacity ; i++) {
// table中的每一个链表元素
HashEntry<K,V> e = oldTable[i];
if (e != null) { // e不等于null
HashEntry<K,V> next = e.next; // 下一个元素
int idx = e.hash & sizeMask; // 重新计算位置,计算在新的table的位置
if (next == null) // Single node on list 证明只有一个元素
newTable[idx] = e; // 把当前的e设置给新的table
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e; // 当前e
int lastIdx = idx; // 在新table的位置
for (HashEntry<K,V> last = next;
last != null;
last = last.next) { // 遍历链表
int k = last.hash & sizeMask; // 确定在新table的位置
if (k != lastIdx) { // 头结点和头结点的next元素的节点发生了变化
lastIdx = k; // 记录变化位置
lastRun = last; // 记录变化节点
}
}
// 以下把链表设置到新table分为两种情况
// (1) lastRun 和 lastIdx 没有发生变化,也就是整个链表的每个元素位置和一样,都没有发生变化
// (2) lastRun 和 lastIdx 发生了变化,记录变化位置和变化节点,然后把变化的这个节点设置到新table
// ,但是整个链表的位置只有变化节点和它后面关联的节点是对的
// 下面的这个遍历就是处理这个问题,遍历当前头节点e,找出不等于变化节点(lastRun)的节点重新处理
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 处理扩容时那个添加的节点

// 计算位置
int nodeIndex = node.hash & sizeMask; // add the new node
// 设置next节点,此时已经扩容完成,要从新table里面去当前位置的头结点为next节点
node.setNext(newTable[nodeIndex]);
// 设置位置
newTable[nodeIndex] = node;
// 新table替换旧的table
table = newTable;
}

6. size 方法——统计元素数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // 为true表示size溢出32位
long sum; // modCounts的总和
long last = 0L; // previous sum
int retries = -1; // 第一次不计算次数,所以会重试三次
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) { // 重试次数达到3次 对所有segment加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) { // seg不等于空
sum += seg.modCount; // 不变化和size一样
int c = seg.count; // seg 的size
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last) // 没有变化
break;
last = sum; // 变化,记录这一次的变化值,下次循环时对比.
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

size 尝试3次不加锁获取sum,如果发生变化就全部加锁,sizecontainsValue方法的思想也是基本类似。
【执行流程】

  1. 第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum;
  2. 第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环;
  3. 第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环;
  4. 第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size;(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞)。

JDK1.8 中的 ConcurrentHashMap

1. 数据结构

jdk8

看起来是不是和 1.8 HashMap 结构类似?

其中抛弃了ConcurrentHashMap原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
  // 表的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认表的大小
private static final int DEFAULT_CAPACITY = 16;
// 最大数组大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发数
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 装载因子
private static final float LOAD_FACTOR = 0.75f;
// 转化为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 由红黑树转化为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 转化为红黑树的表的最小容量
static final int MIN_TREEIFY_CAPACITY = 64;
// 每次进行转移的最小值
private static final int MIN_TRANSFER_STRIDE = 16;
// 生成sizeCtl所使用的bit位数
private static int RESIZE_STAMP_BITS = 16;
// 进行扩容所允许的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 记录sizeCtl中的大小所需要进行的偏移位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 一系列的标识
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 获取可用的CPU个数
static final int NCPU = Runtime.getRuntime().availableProcessors();

// 进行序列化的属性
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};

// 表
transient volatile Node<K,V>[] table;
// 下一个表
private transient volatile Node<K,V>[] nextTable;
// 基本计数
private transient volatile long baseCount;
// 对表初始化和扩容控制
private transient volatile int sizeCtl;
// 扩容下另一个表的索引
private transient volatile int transferIndex;
// 旋转锁
private transient volatile int cellsBusy;
// counterCell表
private transient volatile CounterCell[] counterCells;


static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; // volatile 修饰,保证可见性
volatile Node<K,V> next; // volatile 修饰,保证可见性

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) { throw new UnsupportedOperationException();}

public final boolean equals(Object o) {...}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {...}
}

也将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的。

其中的 val next 都用了volatile 修饰,保证了可见性。

2. putVal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //Step-1: 计算hash值
int binCount = 0; // binCount赋值为0
for (Node<K,V>[] tab = table;;) { // 无限循环
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // Step-2:判断是否需要初始化table
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // Step-3:如果当前node为空,cas写入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // 比较并且交换值,如tab的第i项为空则用新生成的node替换
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // Step-4:如果当前node处于扩容状态,则当前线程帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // Step-5:上锁进行put,分链表与红黑树两种情况
if (tabAt(tab, i) == f) { // 找到table表下标为i的节点
if (fh >= 0) { // 该table表中该结点的hash值大于0
binCount = 1; // binCount赋值为1
for (Node<K,V> e = f;; ++binCount) { // 无限循环
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value; // 将指定的value保存至结点,即进行了结点值的更新
break;
}
Node<K,V> pred = e; // 保存当前结点
if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点
pred.next = new Node<K,V>(hash, key,
value, null); // 新生一个结点并且赋值给next域
break;
}
}
}
else if (f instanceof TreeBin) { // 结点为红黑树结点类型
Node<K,V> p;
binCount = 2; // binCount赋值为2
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, // 将hash、key、value放入红黑树
value)) != null) {
oldVal = p.val;// 保存结点的val
if (!onlyIfAbsent)
p.val = value;// 赋值结点value值
}
}
}
}
if (binCount != 0) { // binCount不为0
if (binCount >= TREEIFY_THRESHOLD) // Step-6:如果binCount大于等于转化为红黑树的阈值
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;// 返回旧值
break;
}
}
}
addCount(1L, binCount);// 增加binCount的数量
return null;
}
  • 根据 key 计算出 hashcode ;
  • 判断table是否需要进行初始化;
  • f 即为当前key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功;
  • 如果当前位置的 hashcode == MOVED == -1,则帮助进行扩容;
  • 如果都不满足,则利用 synchronized 锁写入数据,此处分为红黑树的插入,链表插入两种情况;
  • 如果bincount大于 TREEIFY_THRESHOLD 则要转换为红黑树。

putVal函数中会涉及到如下几个函数:initTabletabAtcasTabAthelpTransferputTreeValtreeifyBinaddCount函数。下面对其中涉及到的函数进行分析。

3. initTable 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 无限循环
if ((sc = sizeCtl) < 0) // sizeCtl小于0,则进行线程让步等待
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 比较sizeCtl的值与sc是否相等,相等则用-1替换
try {
if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0
// sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 新生结点数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 赋值给table
table = tab = nt;
// sc为n * 3/4
sc = n - (n >>> 2);
}
} finally {
// 设置sizeCtl的值
sizeCtl = sc;
}
break;
}
}
// 返回table表
return tab;
}

**说明:**对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。

4. tabAt 函数

1
2
3
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

**说明:**此函数返回table数组中下标为i的结点,可以看到是通过Unsafe对象通过反射获取的,getObjectVolatile的第二项参数为下标为i的偏移地址。

5. casTabAt 函数

1
2
3
4
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

**说明:**此函数用于比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。

6. helpeTransfer 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// table表不为空并且结点类型是ForwardingNode类型,并且结点的nextTable不为空
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 比较并交换
// 将table的结点转移到nextTab中
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

说明:此函数用于在扩容时将table表中的结点转移到nextTable中。

7. treeifyBin函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) { // 表不为空
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // table表的长度小于最小的长度
// 进行扩容,调整某个桶中结点数量过多的问题(由于某个桶中结点数量超出了阈值,则触发treeifyBin)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 桶中存在结点并且结点的hash值大于等于0
synchronized (b) { // 对桶中第一个结点进行加锁
if (tabAt(tab, index) == b) { // 第一个结点没有变化
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) { // 遍历桶中所有结点
// 新生一个TreeNode结点
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null) // 该结点前驱为空
// 设置p为头结点
hd = p;
else
// 尾节点的next域赋值为p
tl.next = p;
// 尾节点赋值为p
tl = p;
}
// 设置table表中下标为index的值为hd
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

说明:此函数用于将桶中的数据结构转化为红黑树,其中,值得注意的是,table的长度未达到阈值时,会进行一次扩容操作,该操作会使得触发treeifyBin操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。

8. get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等
// 返回值
return e.val;
}
else if (eh < 0) // 结点hash值小于0
// 在桶(链表/红黑树)中查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 对于结点hash值大于0的情况
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  • 如果是红黑树那就按照树的方式获取值。
  • 就不满足那就按照链表的方式遍历获取值。

9. sumCount 函数

1
2
3
4
5
6
7
8
9
10
11
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

两个版本 ConcurrentHashMap 的比较

数据结构与存取实现

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+Node+红黑树,相对而言,总结如下思考:

  • JDK1.8的实现降低锁的粒度JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是Node(首节点);
  • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了;
  • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档;
  • JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点:
    • 因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了;
    • JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
    • 在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据

1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很到位的。

对象统计

  • JDK1.7:统计每个segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果不一定准确。因为在计算后面的segment的元素个数时,前面计算过了的segment可能有数据的新增或删除。

    前面计算过了的segment可能有数据的新增或删除

    计算方式为:先采用不加锁的方式,连续计算两次

    计算方式为:先采用不加锁的方式,连续计算两次

    如果两次结果相等,说明计算结果准确

    如果两次结果不相等,说明计算过程中出现了并发新增或者删除操作

    于是给每个segment加锁,然后再次计算

  • JDK1.8:使用一个volatile类型的变量baseCount记录元素的个数,同时部分元素的变化个数保存在CounterCell数组中,通过累加baseCountCounterCell数组中的数量,即可得到元素的总个数;

    在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,执行fullAddCount(x, uncontended)方法,这个方法其实就是初始化counterCells,并将x的值插入到counterCell类中。

    所以counterCells存储的都是value为1的CounterCell对象,而这些对象是因为在CAS更新baseCounter值时,由于高并发而导致失败,最终将值保存到CounterCell中,放到counterCells里。这也就是为什么sumCount()中需要遍历counterCells数组,sum累加CounterCell.value值了。