声明:如果本文有错误,希望指出。
对于实现线程安全队列,现在有两种方式:1、使用阻塞算法;2、使用非阻塞算法。使用阻塞算法的队列是锁的应用。非阻塞则是CAS算法的应用。
简介
CoucurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用 FIFO 对节点排序。CoucurrentLinkedQueue是一个非阻塞队列。
CoucurrentLinkedQueue规定了如下几个不变性:
- 在入队的最后一个元素的next为null
- 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到
- 对于要删除的节点,不是直接将其设置为null,而是先将其item域设置为null(迭代器会跳过item为null的节点)
- 允许head和tail更新滞后。这是什么意思呢?意思就说是head、tail不总是指向第一个元素和最后一个元素。
CoucurrentLinkedQueue结构
CoucurrentLinkedQueue
继承 AbstractQueue
。CoucurrentLinkedQueue 是由head节点和tair节点组成,每个节点(Node)又节点元素(item)和主项下一个节点的(next)组成。默认情况下head节点存储的元素为空,tair节点等于head节点。
CoucurrentLinkedQueue主要方法:
- add/offer:add是调用offer方法的,指定元素插入到队列尾部
- poll:拉取头部元素
- remove:删除元素
Node
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); }
boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset;
static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
|
poll(出队)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
|
其中 updateHead()
,该方法用于CAS更新head节点,如下:
1 2 3 4
| final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
|
offer(入队)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { if (p.casNext(null, newNode)) { if (p != t) casTail(t, newNode); return true; } } else if (p == q) p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q; } }
|
- 第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
- 第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
- 第三步添加元素3,设置tail节点的next节点为元素3节点。
- 第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。
remove
Reference