浅析JDK1.8 ConcurrentLinkedQueue源码。

写在开篇

ConcurrentLinkedQueue是一个基于链表的无界非阻塞队列,并且是线程安全的,它采用的是先进先出的规则,当我们增加一个元素时,它会添加到队列的末尾,当我们取一个元素时,它会返回一个队列头部的元素。

类的继承关系

ConcurrentLinkedQueue继承于AbstractQueue。实现了Queue(规定了Queue的操作规范)、Serializable(可序列化)这两个接口。

1
2
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {

存储结点–Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static class Node<E> {
volatile E item; // 目标元素
volatile Node<E> next; // 当前Node下一个元素

Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

/**
* 表示设置当前Node的item值
* 需要两个参数:期望值(cmp)和目标值(val)
* 当当前值等于cmp时,就会将目标设置为val
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

/**
* casNext和casItem类似,但它用于设置next字段
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
}

成员变量

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。

1
2
3
private transient volatile Node<E> head;

private transient volatile Node<E> tail;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 默认构造方法
*/
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

/**
* 以一个实现了Collection接口的集合类,来构造ConcurrentLinkedQueue
*/
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}

Queue接口的实现

add()

1
2
3
public boolean add(E e) {
return offer(e);
}

接下来看一下add()中的核心方法offer()。这个方法没有任何锁操作,线程安全完全由CAS操作和队列的算法来保证,整个方法的核心是for循环,这个循环没有出口,直到尝试成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个结点
if (p.casNext(null, newNode)) {
// 每两次更新一下tail
if (p != t)
casTail(t, newNode);
return true;
}
// CAS竞争失败,再次尝试
}
else if (p == q) // 遇到哨兵结点,都从head开始遍历
// 但如果tail被修改,则使用tail(因为可能被修改正确了)
p = (t != (t = tail)) ? t : head;
else
// 取下一个结点或者最后一个结点
p = (p != t && t != (t = tail)) ? t : q;
}
}

看到这里,或许大家会有两个疑问:

  1. 为什么会出现p == q(即遇到哨兵结点)的情况?
  2. p = (t != (t = tail)) ? t : head;这行代码是什么情况?

关于第一个问题,先简单解释一下:所谓哨兵结点,就是next指向自己的结点。这种结点在队列中存在的意义不大,主要表示删除的结点,或者空结点。当遇到哨兵结点时,由于无法通过next取得后续的结点,因此很可能直接返回head,期望从链表头部开始遍历,进一步找到链表末尾。一旦在执行过程中发生tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾(这样就避免了重新查找tail的开销)。

第二个问题:这句代码虽然只有短短的一行,但是包含的信息比较多。首先“!=”并不是原子操作,它是可以被中断的。也就是说,在执行“!=”时,程序会先取得t的值,再执行t=tail,并取得新的t的值,然后比较这两个值是否相等。在单线程时,t!=t这种语句显然不会成立。但是在并发环境中,有可能在获得左边的t值后,右边的t值被其他线程修改。这样,t!=t就可能成立了,这里就是这种情况。如果在比较的过程中,tail被其他线程修改,当它再次赋值给t时,就会导致等式左边的t和右边的t不同。如果两个t不相同,表示tail在中途被其他线程篡改。这时,我们就可以用新的tail作为链表的结尾,也就是这里等式右边的t。但如果没有被修改,则返回head,要求从头部开始,重新查找尾部。

poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E poll() {
restartFromHead:
for (;;) {
// p节点表示首节点,即需要出队的节点
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果p节点的元素不为null,则通过CAS来设置p节点引用的元素为null,如果成功则返回p节点的元素
if (item != null && p.casItem(item, null)) {
// 如果p != h,则更新head
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
// 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p == q,则使用新的head重新开始
else if (p == q)
continue restartFromHead;
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
else
p = q;
}
}
}

在这个地方我们停下来分析一下,哨兵结点是如何产生的。

1
2
3
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.add("1");
q.poll();

对于上面例子的poll(),由于队列中只有一个元素,根据前文的描述,此时tail并没有更新,而是指向head相同的位置。而此时,head本身的item域为null,其next为列表的第一个元素。故在第一个循环中,代码直接走到最后的else,将p赋值为q,而q就是p.next,也就是当前列表中的第一个元素。接着,在第2轮循环中,p.item显然不为null。因此,代码应该可以顺利进入第一个if块(如果CAS操作成功)。进入第一个if块,也意味着p的item域被设置为null。同时,此时p和h是不相等的。故执行了updataHead(),其实现如下:

1
2
3
4
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

可以看到,在updateHead()中将p作为新的链表头部(通过casHead()实现),而原有的head就被设置为哨兵了(通过lazySetNext()实现)。

这样一个哨兵结点就产生了,而由于此时原有的head头部和tail实际上就是同一个元素。因此,再次用offer()插入元素时,就会遇到这个tail,也就是哨兵。这就是offer()中else if (p == q)这行代码的意义。

peek()

获取链表的首部元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

从源码中可以看到,peek操作会改变head指向,执行peek()方法后head会指向第一个具有非空元素的节点。

remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 删除的元素不能为null
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
// 若不匹配,则获取next节点继续匹配
if (!o.equals(item)) {
next = succ(p);
continue;
}
// 若匹配,则通过CAS操作将对应节点元素置为null
removed = p.casItem(item, null);
}

// 获取删除节点的后继节点
next = succ(p);
// 将被删除的节点移除队列
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}

size()

1
2
3
4
5
6
7
8
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
if (++count == Integer.MAX_VALUE)
break;
return count;
}

size()用来获取当前队列的元素个数,但在并发环境中,其结果可能不精确,因为整个过程都没有加锁,所以从调用size()到返回结果期间有可能增删元素,导致统计的元素个数不精确。

为什么每两次更新一下tail

为什么不让tail结点永远成为队列的尾结点,实现代码会更少且逻辑也会更加清晰?

这是因为,如果让tail永远成为队列的尾结点,则每次都需要使用循环CAS更新tail结点,如果能减少更新tail结点的次数,入队性能岂不更高?所以说并不是每次入队都需要更新尾结点,只有tail结点和尾结点不相等的情况下才更新,减少更新,提高效率。

总结

通过以上的分析,大家应该可以明显感觉到,不使用锁而单纯使用CAS操作会要求在应用层面保证线程安全,并处理一些可能存在不一致的问题,大大增加了程序设计和实现的难度。它带来的好处就是使性能飞速提升,因此,在有些场合也是值得的。

参考: