浅析JDK1.8 ConcurrentLinkedQueue源码。
写在开篇
ConcurrentLinkedQueue是一个基于链表的无界非阻塞队列,并且是线程安全的,它采用的是先进先出的规则,当我们增加一个元素时,它会添加到队列的末尾,当我们取一个元素时,它会返回一个队列头部的元素。
类的继承关系
ConcurrentLinkedQueue继承于AbstractQueue。实现了Queue(规定了Queue的操作规范)、Serializable(可序列化)这两个接口。1
2public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
存储结点–Node
1 | private static class Node<E> { |
成员变量
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。1
2
3private transient volatile Node<E> head;
private transient volatile Node<E> tail;
构造方法
1 | /** |
Queue接口的实现
add()
1 | public boolean add(E e) { |
接下来看一下add()中的核心方法offer()。这个方法没有任何锁操作,线程安全完全由CAS操作和队列的算法来保证,整个方法的核心是for循环,这个循环没有出口,直到尝试成功。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个结点
if (p.casNext(null, newNode)) {
// 每两次更新一下tail
if (p != t)
casTail(t, newNode);
return true;
}
// CAS竞争失败,再次尝试
}
else if (p == q) // 遇到哨兵结点,都从head开始遍历
// 但如果tail被修改,则使用tail(因为可能被修改正确了)
p = (t != (t = tail)) ? t : head;
else
// 取下一个结点或者最后一个结点
p = (p != t && t != (t = tail)) ? t : q;
}
}
看到这里,或许大家会有两个疑问:
- 为什么会出现p == q(即遇到哨兵结点)的情况?
p = (t != (t = tail)) ? t : head;
这行代码是什么情况?
关于第一个问题,先简单解释一下:所谓哨兵结点,就是next指向自己的结点。这种结点在队列中存在的意义不大,主要表示删除的结点,或者空结点。当遇到哨兵结点时,由于无法通过next取得后续的结点,因此很可能直接返回head,期望从链表头部开始遍历,进一步找到链表末尾。一旦在执行过程中发生tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾(这样就避免了重新查找tail的开销)。
第二个问题:这句代码虽然只有短短的一行,但是包含的信息比较多。首先“!=”并不是原子操作,它是可以被中断的。也就是说,在执行“!=”时,程序会先取得t的值,再执行t=tail,并取得新的t的值,然后比较这两个值是否相等。在单线程时,t!=t这种语句显然不会成立。但是在并发环境中,有可能在获得左边的t值后,右边的t值被其他线程修改。这样,t!=t就可能成立了,这里就是这种情况。如果在比较的过程中,tail被其他线程修改,当它再次赋值给t时,就会导致等式左边的t和右边的t不同。如果两个t不相同,表示tail在中途被其他线程篡改。这时,我们就可以用新的tail作为链表的结尾,也就是这里等式右边的t。但如果没有被修改,则返回head,要求从头部开始,重新查找尾部。
poll()
1 | public E poll() { |
在这个地方我们停下来分析一下,哨兵结点是如何产生的。
1 | ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>(); |
对于上面例子的poll(),由于队列中只有一个元素,根据前文的描述,此时tail并没有更新,而是指向head相同的位置。而此时,head本身的item域为null,其next为列表的第一个元素。故在第一个循环中,代码直接走到最后的else,将p赋值为q,而q就是p.next,也就是当前列表中的第一个元素。接着,在第2轮循环中,p.item显然不为null。因此,代码应该可以顺利进入第一个if块(如果CAS操作成功)。进入第一个if块,也意味着p的item域被设置为null。同时,此时p和h是不相等的。故执行了updataHead()
,其实现如下:1
2
3
4final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
可以看到,在updateHead()中将p作为新的链表头部(通过casHead()实现),而原有的head就被设置为哨兵了(通过lazySetNext()实现)。
这样一个哨兵结点就产生了,而由于此时原有的head头部和tail实际上就是同一个元素。因此,再次用offer()插入元素时,就会遇到这个tail,也就是哨兵。这就是offer()中else if (p == q)
这行代码的意义。
peek()
获取链表的首部元素。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
从源码中可以看到,peek操作会改变head指向,执行peek()方法后head会指向第一个具有非空元素的节点。
remove()
1 | // 删除的元素不能为null |
size()
1 | public int size() { |
size()用来获取当前队列的元素个数,但在并发环境中,其结果可能不精确,因为整个过程都没有加锁,所以从调用size()到返回结果期间有可能增删元素,导致统计的元素个数不精确。
为什么每两次更新一下tail
为什么不让tail结点永远成为队列的尾结点,实现代码会更少且逻辑也会更加清晰?
这是因为,如果让tail永远成为队列的尾结点,则每次都需要使用循环CAS更新tail结点,如果能减少更新tail结点的次数,入队性能岂不更高?所以说并不是每次入队都需要更新尾结点,只有tail结点和尾结点不相等的情况下才更新,减少更新,提高效率。
总结
通过以上的分析,大家应该可以明显感觉到,不使用锁而单纯使用CAS操作会要求在应用层面保证线程安全,并处理一些可能存在不一致的问题,大大增加了程序设计和实现的难度。它带来的好处就是使性能飞速提升,因此,在有些场合也是值得的。
参考: