0%

Java阻塞队列 —— LinkedBlockingDeque 和 LinkedBlockingQueue

声明:如果本文有错误,希望指出。

LinkedBlockingDeque 则是一个由链表组成的双向阻塞队列。可以从对头、对尾两端插入和移除元素,同样意味着 LinkedBlockingDeque 支持FIFO、FILO两种操作方式。LinkedBlockingDeque 是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。

LinkedBlockingQueue 是一个由链表组成的,只能从一端出一端入,支持 FIFO,并通过 ReentrantLock 和 两个Condition实现。

LinkedBlockingDeque

结构定义

通过上面的Lock可以看出,LinkedBlockingDeque底层实现机制与LinkedBlockingQueue一样,依然是通过互斥锁ReentrantLock 来实现,notEmpty 、notFull 两个Condition做协调生产者、消费者问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
transient Node<E> first;// 双向链表的表头
transient Node<E> last;// 双向链表的表尾
private transient int count;
private final int capacity;//容量
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

基本API

LinkedBlockingDeque 的add、put、offer、take、peek、poll系列方法都是通过调用XXXFirst,XXXLast方法。因此只需要了解putFirst、putLast、pollFirst、pollLast。

putFirst

putFirst(E e) :将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
先获取锁,然后调用linkFirst方法入列,最后释放锁。如果队列是满的则在notFull上面等待。linkFirst设置Node为对头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);//生成一个新节点
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))//调用linkFirst添加节点
notFull.await();
} finally {
lock.unlock();
}
}
```
linkFirst主要是设置node节点队列的列头节点,成功返回true,如果队列满了返回false。整个过程还是比较简单的。
```java
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> f = first;//首节点
node.next = f;// 新节点的next指向原first
first = node;// 设置node为新的first
if (last == null)
// 没有尾节点,设置node为尾节点
last = node;
else
// 有尾节点,那就将之前first的pre指向新增node
f.prev = node;
++count;
notEmpty.signal();
return true;
}

putLast

putLast(E e) :将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//调用linkLast将节点Node链接到队列尾部
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
if (count >= capacity)
return false;
Node<E> l = last;//获取尾节点
node.prev = l; // 将Node的前驱指向原本的last
last = node;// 将node设置为last
if (first == null)
// 首节点为null,则设置node为first
first = node;
else
//非null,说明之前的last有值,就将之前的last的next指向node
l.next = node;
++count;
notEmpty.signal();
return true;
}

pollFirst

pollFirst():获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}

pollLast

pollLast():获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}

LinkedBlockingQueue

结构

这是一个只能一端出一端入的单向队列结构,具有 FIFO 特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 主要 node节点
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}

/** 边界限制,否则是Integer.MAX_VALUE */
private final int capacity;

/** 用AtomicInteger 来记录数量 */
private final AtomicInteger count = new AtomicInteger();

/**链表头结点*/
transient Node<E> head;

/**链表last节点*/
private transient Node<E> last;

/**take 锁*/
private final ReentrantLock takeLock = new ReentrantLock();

/** 等待take的节点序列 */
private final Condition notEmpty = takeLock.newCondition();

/** put锁*/
private final ReentrantLock putLock = new ReentrantLock();

/**put等待队列*/
private final Condition notFull = putLock.newCondition();

put操作

put操作是向队列尾部插入一个元素,具体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void put(E e) throws InterruptedException {
//插入的元素不能为null
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;//获取put锁
final AtomicInteger count = this.count;//获取count
putLock.lockInterruptibly();
try {
//如果队列满了,使用notFull阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();//CAS增加count
//如果队列有空间了,notFull唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {//释放锁
putLock.unlock();
}
//如果队列size为0,也要
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//入队操作,对尾插入元素
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//加锁
try {
notEmpty.signal();//用于signal,notEmpty
} finally {
takeLock.unlock();
}
}

take操作

take操作是从对了中弹出一个元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E take() throws InterruptedException {
E x;
int c = -1;//设定一个记录变量
final AtomicInteger count = this.count;//获取count
final ReentrantLock takeLock = this.takeLock;//获取take锁
takeLock.lockInterruptibly();//加锁
try {
//如果队列中没有元素,就阻塞性等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//队列还有元素,唤醒队列
if (c > 1)
notEmpty.signal();
} finally {//释放锁
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//解锁
return x;
}
private E dequeue() {
//
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC 指向自己,帮助GC回收
head = first;
E x = first.item;//从队列头部弹出元素
first.item = null;//将head.item设为null
return x;
}
客官,赏一杯coffee嘛~~~~