0%

Java并发容器 —— ConcurrentLinkedQueue

声明:如果本文有错误,希望指出。

对于实现线程安全队列,现在有两种方式:1、使用阻塞算法;2、使用非阻塞算法。使用阻塞算法的队列是锁的应用。非阻塞则是CAS算法的应用。

简介

CoucurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用 FIFO 对节点排序。CoucurrentLinkedQueue是一个非阻塞队列。
CoucurrentLinkedQueue规定了如下几个不变性:

  • 在入队的最后一个元素的next为null
  • 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到
  • 对于要删除的节点,不是直接将其设置为null,而是先将其item域设置为null(迭代器会跳过item为null的节点)
  • 允许head和tail更新滞后。这是什么意思呢?意思就说是head、tail不总是指向第一个元素和最后一个元素。

CoucurrentLinkedQueue结构

CoucurrentLinkedQueue 继承 AbstractQueue。CoucurrentLinkedQueue 是由head节点和tair节点组成,每个节点(Node)又节点元素(item)和主项下一个节点的(next)组成。默认情况下head节点存储的元素为空,tair节点等于head节点。

CoucurrentLinkedQueue主要方法:

  • add/offer:add是调用offer方法的,指定元素插入到队列尾部
  • poll:拉取头部元素
  • remove:删除元素

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static class Node<E> {
/**节点元素*/
volatile E item;
volatile Node<E> next;
/**初始化,获得item和next的偏移量,为后期的CAS做准备*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;//偏移量
private static final long nextOffset;//下个元素偏移量

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

poll(出队)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E poll() {
restartFromHead://没搞懂这个是干啥子的???
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// item 不为null,则将item 设置为null
if (item != null && p.casItem(item, null)) {
if (p != h) // p != head 则更新head
// p.next != null,则将head更新为p.next ,否则更新为p
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// p.next == null 队列为空
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 当一个线程在poll的时候,另一个线程已经把当前的p从队列中删除——将p.next = p,p已经被移除不能继续,需要重新开始
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

其中 updateHead() ,该方法用于CAS更新head节点,如下:

1
2
3
4
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

offer(入队)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean offer(E e) {
/**检查添加的元素是否是null*/
checkNotNull(e);
/**创建新的节点*/
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
//q==null,表示节点p是最后一个节点了,在尾部添加节点
// 如果插入失败,则表示其他线程已经修改了p的指向
if (q == null) {
// casNext:t节点的next指向当前节点
// casTail:设置tail 尾节点
if (p.casNext(null, newNode)) {
// node 加入节点后会导致tail距离最后一个节点相差大于一个,需要更新tail
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q)
// p == q 代表着该节点已经被删除了
// 由于多线程的原因,我们offer()的时候也会poll(),如果offer()的时候正好该节点已经poll()了
// 那么在poll()方法中的updateHead()方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
// 这样就会导致tail节点滞后head(tail位于head的前面),则需要重新设置p
p = (t != (t = tail)) ? t : head;
// tail并没有指向尾节点
else
// tail已经不是最后一个节点,将p指向最后一个节点
p = (p != t && t != (t = tail)) ? t : q;
}
}

  • 第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
  • 第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
  • 第三步添加元素3,设置tail节点的next节点为元素3节点。
  • 第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

remove

Reference

客官,赏一杯coffee嘛~~~~