浅析JDK1.8 ConcurrentHashMap源码,不得不感慨一下Doug Lea真的太强了!

写在开篇

先贴张图看下ConcurrentHashMap JDK 1.8的结构:

CHM01
CHM01

先大体介绍一下:ConcurrentHashMap JDK1.8抛弃了JDK1.7中原有的Segment分段锁,而采用了CAS + synchronized来保证并发安全性。

类的继承关系

1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {

继承于抽象的AbstractMap,ConcurrentMap,Serializable这两个接口。

重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 存储Node结点数据,默认初始大小为16
*/
transient volatile Node<K,V>[] table;

/**
* 扩容生成的新数组
*/
private transient volatile Node<K,V>[] nextTable;

/**
* 用于统计容器中Node结点数量,在下文计算结点数量会详细介绍
*/
private transient volatile long baseCount;

/**
* 控制 table 的初始化和扩容操作,默认为0
* -1:table正在初始化
* -N:有N-1个线程正在进行扩容操作
* >0:table.length * 0.75 扩容阈值调为table容量大小的0.75倍
*/
private transient volatile int sizeCtl;

/**
* 扩容时,当前转移的位置
*/
private transient volatile int transferIndex;

/**
* 代表计数桶状态,默认空闲时0,忙是1
*/
private transient volatile int cellsBusy;

/**
* 计数桶,容器中计算Node的数量相加baseCount以及CounterCell[]的值
*/
private transient volatile CounterCell[] counterCells;

Node

Node:将JDK1.7中存放数据的HashEntry改为Node。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

// ...
}

ForwardingNode:一个特殊的 Node 结点,hash值为-1,作为一个占位符放在原数组中表示当前结点在nextTbale中已经被移动。

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

// ...

Node<K,V> find(int h, Object k) {
// 扩容时,get()会调用到,在下文会分析
}
}

构造方法

ConcurrentHashMapJDK1.8构造方法中,是还没有初始化Node[]的,没有参数的构造方法是个空方法,如下:

1
2
public ConcurrentHashMap() {
}

其余的构造方法都只是设置了sizeCtl,举个例子:

1
2
3
4
5
6
7
8
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 对key的hashcode做了处理,hash 计算多了一步 & HASH_BITS(0x7fffffff)
// 是为了消除高位上的负号,hash的负在ConcurrentHashMap中有特殊意义,表示在扩容或者是树结点
int hash = spread(key.hashCode());
// 计算该桶链表结点数,若大于TREEIFY_THRESHOLD,需要链表转成红黑树或者数组扩容
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 无限循环
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // 表为空
// 初始化表
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 该桶没有结点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // 插入结点
break;
}
else if ((fh = f.hash) == MOVED) // 正在扩容
// 协助扩容
tab = helpTransfer(tab, f);
else { // 该桶有结点
V oldVal = null;
synchronized (f) { // 加锁同步
if (tabAt(tab, i) == f) { // 再次确认Node对象还是原来的那一个
if (fh >= 0) { // 链表结点
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key值相同
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 插入新结点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 树结点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果binCount大于等于转为红黑树的阈值
if (binCount >= TREEIFY_THRESHOLD)
// 转换或扩容
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// binCount++,并判断是否需要扩容
addCount(1L, binCount);
return null;
}

初始化数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 每次循环都会获取最新table
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) // 代表有线程在进行初始化工作了
Thread.yield(); // 让出cpu
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // sizeCtl设为-1
try {
if ((tab = table) == null || tab.length == 0) { // 再检查一遍数组是否为空
// table容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将其赋值给table变量
table = tab = nt;
// 相当于n * 0.75
sc = n - (n >>> 2);
}
} finally {
// 设置扩容阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}

treeifyBin()

treeifyBin()不一定就会进行红黑树转换,也可能是仅仅做数组扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final void treeifyBin(Node<K,V>[] tab, int index) { 
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // table长度小于最小的长度
tryPresize(n << 1); // 扩容,调整某个桶中结点数量过多的问题
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 转为红黑树的情况
synchronized (b) { // 加锁同步
if (tabAt(tab, index) == b) { // 再次确认Node对象还是原来的那一个
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) { // 遍历桶中结点
// 创建TreeNode结点
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null) // 该结点前驱为空
hd = p; // 设置为头结点
else
// 尾结点的next设为p
tl.next = p;
// 尾结点赋值为p
tl = p;
}
// 设置table中下标为index的值为hd
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

扩容

什么时候会扩容

  1. addCount():使用put()插入元素时会调用addCount(),检查是否需要扩容
  2. tryPresize():
    • 链表转红黑树过程,table容量小于MIN_TREEIFY_CAPACITY时
    • 调用putAll()一次性加入大量元素时,会触发扩容
  3. helpTransfer():使用put()插入元素时,发现Node为fwd时,会协助扩容

addCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final void addCount(long x, int check) {

// ...
// 上面代码是计算容器中结点的数量,我们最后再讲
// 现在只需要知道下面代码中的s是即是加入新元素后容器容量大小

if (check >= 0) { // check为结点数量,有新元素加入成功才检查是否要扩容
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) { // 容量大于当前扩容阈值并且小于最大扩容值才扩容
int rs = resizeStamp(n); // 看下面分析
if (sc < 0) { // 已有线程在进行扩容工作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 没有线程在进行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

resizeStamp()

在上面的代码中调用到这个方法,我们来看一下:

1
2
3
4
5
6
7
private static int RESIZE_STAMP_BITS = 16;

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

Integer.numberOfLeadingZeros(n):计算n转换成32位二进制之后1前面有几个0。因为ConcurrentHashMap的容量一定是2的幂次方,所以不同的容量n前面0的个数一定不同,这样可以保证是在原容量为n的情况下进行扩容。

1 << (RESIZE_STAMP_BITS - 1):即是1<<15,表示为二进制即是高16位为0,低16位为1:

1
0000 0000 0000 0000 1000 0000 0000 0000

所以resizeStamp()的返回值(rs):高16位置0,第16位为1,低15位存放当前容量n,用于表示是对n的扩容。

rs与RESIZE_STAMP_SHIFT配合可以求出新的sizeCtl的值,分情况如下:

  • sc < 0:已经有线程在扩容,将sizeCtl+1并调用transfer()让当前线程参与扩容
  • sc >= 0:没有线程在扩容,使用CAS将sizeCtl的值改为(rs << RESIZE_STAMP_SHIFT) + 2)

rs即resizeStamp(n),记temp=rs << RESIZE_STAMP_SHIFT。如当前容量为8时rs的值:

1
2
3
4
5
6
//rs
0000 0000 0000 0000 1000 0000 0000 1000
//temp = rs << RESIZE_STAMP_SHIFT,即 temp = rs << 16,左移16后temp最高位为1,所以temp成了一个负数
1000 0000 0000 1000 0000 0000 0000 0000
//sc = (rs << RESIZE_STAMP_SHIFT) + 2)
1000 0000 0000 1000 0000 0000 0000 0010

那么在扩容时sizeCtl值的意义:高15位为容量n,第16位位并行扩容线程数+1。

tryPresize()

和addCount()的实现很相似,不多赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private final void tryPresize(int size) { // size传入时已*2
// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) { // 没有其他线程在初始化、扩容
Node<K,V>[] tab = table; int n;
// 这个if分支和前面的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) { // 已有线程在进行扩容工作
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 将sizeCtl加1,然后执行transfer(),此时nextTab不为null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 将sizeCtl设置为(rs << RESIZE_STAMP_SHIFT) + 2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 调用transfer(),此时nextTab参数为null
transfer(tab, null);
}
}
}

helpTransfer()

和addCount()的实现很相似,不多赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { // 已有线程在进行扩容工作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// sizeCtl加1,表示多一个线程进来协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

transfer()

我们可以看到上面三种方法最后都调用了transfer(),显然transfer()才是真正进行并行扩容的地方。

当外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab参数为null,之后再调用此方法的时候,nextTab不会为null。 理解为有n个迁移任务,让每个线程每次负责一个小任务,每做完一个任务再检测是否有其他没做完的任务。

  • Doug Lea使用了一个stride(步长),每个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。所以需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。
  • transfer()并没有实现所有的迁移任务,每次调用这个方法只实现了transferIndex往前stride个位置的迁移工作,其他的需要由外围来控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // 参数为原数组,扩展数组
int n = tab.length, stride;
//根据cpu个数找出扩容时的数组跨度大小即最小分组
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) { // 普通扩容nextTab为空,竞争帮助扩容时不为空
// 初始化nextTable
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 当前转移的位置,说明是逆序迁移
transferIndex = n;
}
int nextn = nextTab.length;
// 扩容时的特殊节点,标明此节点正在进行迁移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 当前线程是否需要继续寻找下一个可处理的节点
boolean advance = true;
// 所有桶是否都已迁移完成。
boolean finishing = false;
for (int i = 0, bound = 0;;) { // 无限循环
Node<K,V> f; int fh;
// 此循环的作用是确定当前线程要迁移的桶的范围或通过更新i的值确定当前范围内下一个要处理的节点
while (advance) {
int nextIndex, nextBound;
// i为当前正在处理的Node数组下标,每次处理一个Node节点就会自减1
if (--i >= bound || finishing) // 检查结束条件
advance = false;
else if ((nextIndex = transferIndex) <= 0) { // 迁移总进度<=0,表示所有桶都已迁移完成
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 当前线程迁移桶的范围
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
* 1. 全部桶迁移完成
* 2. 下面"i = n"后,再次进入循环时要做的边界检查
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 所有线程迁移完成
nextTable = null;
// 替换table
table = nextTab;
// sizeCtl为新容量的0.75倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 参与扩容线程数-1。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 根据前面addCount()或tryPresize()中的sizeCtl+2这里就有-2,相等时说明没有线程在参与扩容了
return;
finishing = advance = true;
i = n; // 赋值i为n,让其进入上面if进行是否结束的检查,并完成扩容后续操作
}
}
else if ((f = tabAt(tab, i)) == null) // 原数组i位置无结点
advance = casTabAt(tab, i, null, fwd); // 插入fwd结点
else if ((fh = f.hash) == MOVED) // 实际是检查上一步为null时CAS是否成功
advance = true; // 之后在上面的while中变更i后继续
else { // 转移该桶
synchronized (f) { // 加锁同步
if (tabAt(tab, i) == f) {
// 低位结点,高位结点
Node<K,V> ln, hn;
if (fh >= 0) { // 链表结点
// 由于n是2的幂次方(所有二进制位中只有一个1)
// 如n=16(0001 0000),第4位为1,那么hash&n后的值第4位只能为0或1。所以可以根据hash&n的结果将所有结点分为两部分
int runBit = fh & n;
Node<K,V> lastRun = f;
// 找出最后一段完整的fh&n不变的链表,这样最后这一段链表就不用重新创建新结点了
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) { // runBit=0,表示此Node为低位Node
ln = lastRun;
hn = null;
}
else { // 此Node为高位Node
hn = lastRun;
ln = null;
}
// 迁移lastRun之前的结点
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 低位链表放在i处
setTabAt(nextTab, i, ln);
// 高位链表放在i+n处
setTabAt(nextTab, i + n, hn);
// 在原table中设置ForwardingNode节点以提示该桶扩容完成
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 红黑树结点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 扩容后结点数量太少降为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

代码有点长,在这里小结一下,这个方法主要分成两个部分:

  • while循环:确定当前线程要迁移的桶的范围以及通过更新i的值确定当前范围内下一个要处理的结点
  • 其他代码:转移桶中结点

get()

这里我们主要关注扩容时,get()怎么做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // 在迁移或都是TreeBin
// 调用节点对象的find方法查找值
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

内部类ForwardingNode中的find():
这里的查找,是去新Node数组nextTable中查找的,过程与HashMap相似,不多赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}

容器结点数量

统计容器大小其实是用了两种思路:

  1. CAS方式直接递增:在线程竞争不大的时候,直接使用CAS操作递增baseCount值即可,这里说的竞争不大指的是CAS操作不会失败的情况
  2. 分而治之桶计数:若出现了CAS操作失败的情况,则证明此时有线程竞争了,计数方式从CAS方式转变为分而治之的桶计数方式

countCell

这里解释一下什么是计数桶:

CHM02
CHM02

这样减少了线程的冲突,查询总数的时候sum = countCell[0] + countCell[1] + countCell[2] + countCell[3]

在设计中,使用了分而治之的思想,将每一个计数都分散到各个countCell对象里面(下面称之为桶),使竞争最小化,又使用了CAS操作,就算有竞争,也可以对失败了的线程进行其他的处理。乐观锁的实现方式与悲观锁不同之处就在于乐观锁可以对竞争失败了的线程进行其他策略的处理,而悲观锁只能等待锁释放,所以这里使用CAS操作对竞争失败的线程做了其他处理,很巧妙的运用了CAS乐观锁。

代码实现

CounterCell:

1
2
3
4
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

接着补上addCount()我们在上面跳过的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
/**
* 进入此语句块有两种可能
* 1.counterCells被初始化完成了,不为null
* 2.CAS操作递增baseCount值失败了,说明有竞争
*/
CounterCell a; long v; int m;
// 标志是否存在竞争
boolean uncontended = true;
/**
* 条件:
* 1.计数桶是否还没初始化,若as == null,进入语句块
* 2.计数桶是否为空,若桶为空进入语句块
* 3.用一个线程变量随机数,与上(桶大小-1),若桶的这个位置为空,进入语句块
* 4.到这里说明桶已经初始化了,且随机的这个位置不为空,尝试CAS操作使桶加1,失败设置uncontended值并进入语句块
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 初始化或扩容counterCell[]
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}

// ...
}

初始化CounterCell[]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private final void fullAddCount(long x, boolean wasUncontended) {
int h; // 线程随机变量
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 此时计数桶为null,不走这部分if块
if ((as = counterCells) != null && (n = as.length) > 0) {
// ...
}

// 进入此语句块进行计数桶的初始化
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // CAS设置cellsBusy=1,表示现在计数桶Busy中
boolean init = false;
try {
if (counterCells == as) { // 再次确认计数桶为空
// 初始化一个长度为2的计数桶
CounterCell[] rs = new CounterCell[2];
// h为一个随机数,与上1则代表结果为0、1中随机的一个
// 也就是在0、1下标中随便选一个计数桶,x=1,放入1的值代表增加1个容量
rs[h & 1] = new CounterCell(x);
// 将初始化好的计数桶赋值给ConcurrentHashMap
counterCells = rs;
init = true;
}
} finally {
// 最后将busy标识设置为0,表示不busy了
cellsBusy = 0;
}
if (init)
break;
}
// 若有线程同时来初始化计数桶,则没有抢到busy资格的线程就先来CAS递增baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}

扩容CounterCell[]

从上面的分析中我们知道,计数桶初始化之后长度为2,在竞争大的时候肯定是不够用的,所以一定有计数桶的扩容操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 计数桶初始化好了,进入该if块
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { // 从计数桶数组随机选一个计数桶,若为null表示该桶位还没线程递增过
if (cellsBusy == 0) { // 计数桶数组busy状态是否被标识
// 创建一个计数桶
CounterCell r = new CounterCell(x);
// 标志计数桶数组busy状态
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 将刚刚创建的计数桶赋值给对应位置
rs[j] = r;
created = true;
}
} finally {
// 标识不busy了
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
// 走到这里代表计数桶不为null,尝试递增计数桶
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false;
// 若CAS操作失败了,到了这里,会先进入一次下面的if块,然后再走一次刚刚的for循环
// 若是第二次for循环,collide=true,则不会走进去
else if (!collide)
collide = true;
// 计数桶扩容,一个线程若走了两次for循环,也就是进行了多次CAS操作递增计数桶失败了
// 则进行计数桶扩容,CAS标示计数桶busy中
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
// 确认计数桶还是同一个
if (counterCells == as) {
// 将长度扩大到2倍
CounterCell[] rs = new CounterCell[n << 1];
// 遍历旧计数桶,将引用直接搬过来
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 赋值
counterCells = rs;
}
} finally {
// 取消busy状态
cellsBusy = 0;
}
collide = false;
continue;
}
// 重新设置线程随机数
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 初始化计数桶...
}
// 初始化计数桶没抢到计数桶数组busy资格才能走到这
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}

小结一下:

  • 在CAS操作递增计数桶失败了3次之后,会进行扩容计数桶操作,注意此时同时进行了两次随机定位计数桶来进行CAS递增的,所以此时可以保证大概率是因为计数桶不够用了,才会进行计数桶扩容
  • 计数桶长度增加到两倍长度,数据直接遍历迁移过来,由于计数桶不像HashMap数据结构那么复杂,有hash算法的影响,加上计数桶只是存放一个long类型的计数值而已,所以直接赋值引用即可

参考:
https://blog.csdn.net/qq_41737716/article/details/90549847
https://blog.csdn.net/tp7309/article/details/76532366
https://www.jianshu.com/p/81d848ea6c1a