声明:如果本文有错误,希望指出。
LinkedBlockingDeque
则是一个由链表组成的双向阻塞队列。可以从对头、对尾两端插入和移除元素,同样意味着 LinkedBlockingDeque
支持FIFO、FILO两种操作方式。LinkedBlockingDeque
是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。
LinkedBlockingQueue 是一个由链表组成的,只能从一端出一端入,支持 FIFO,并通过 ReentrantLock 和 两个Condition实现。
LinkedBlockingDeque 结构定义 通过上面的Lock可以看出,LinkedBlockingDeque底层实现机制与LinkedBlockingQueue一样,依然是通过互斥锁ReentrantLock 来实现,notEmpty 、notFull 两个Condition做协调生产者、消费者问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class LinkedBlockingDeque <E > extends AbstractQueue <E > implements BlockingDeque <E >, java .io .Serializable { static final class Node <E > { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } } transient Node<E> first; transient Node<E> last; private transient int count; private final int capacity; final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition();
基本API LinkedBlockingDeque 的add、put、offer、take、peek、poll系列方法都是通过调用XXXFirst,XXXLast方法。因此只需要了解putFirst、putLast、pollFirst、pollLast。
putFirst putFirst(E e) :将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。 先获取锁,然后调用linkFirst方法入列,最后释放锁。如果队列是满的则在notFull上面等待。linkFirst设置Node为对头。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void putFirst (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this .lock; lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { lock.unlock(); } } ``` linkFirst主要是设置node节点队列的列头节点,成功返回true ,如果队列满了返回false 。整个过程还是比较简单的。 ```java private boolean linkFirst (Node<E> node) { if (count >= capacity) return false ; Node<E> f = first; node.next = f; first = node; if (last == null ) last = node; else f.prev = node; ++count; notEmpty.signal(); return true ; }
putLast putLast(E e) :将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void putLast (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this .lock; lock.lock(); try { while (!linkLast(node)) notFull.await(); } finally { lock.unlock(); } } private boolean linkLast (Node<E> node) { if (count >= capacity) return false ; Node<E> l = last; node.prev = l; last = node; if (first == null ) first = node; else l.next = node; ++count; notEmpty.signal(); return true ; }
pollFirst pollFirst():获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public E pollFirst () { final ReentrantLock lock = this .lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } private E unlinkFirst () { Node<E> f = first; if (f == null ) return null ; Node<E> n = f.next; E item = f.item; f.item = null ; f.next = f; first = n; if (n == null ) last = null ; else n.prev = null ; --count; notFull.signal(); return item; }
pollLast pollLast():获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public E pollLast () { final ReentrantLock lock = this .lock; lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } } private E unlinkLast () { Node<E> l = last; if (l == null ) return null ; Node<E> p = l.prev; E item = l.item; l.item = null ; l.prev = l; last = p; if (p == null ) first = null ; else p.next = null ; --count; notFull.signal(); return item; }
LinkedBlockingQueue 结构 这是一个只能一端出一端入的单向队列结构,具有 FIFO 特性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class LinkedBlockingQueue <E > extends AbstractQueue <E > implements BlockingQueue <E >, java .io .Serializable { static class Node <E > { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
put操作 put操作是向队列尾部插入一个元素,具体源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); } private void enqueue (Node<E> node) { last = last.next = node; } private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
take操作 take操作是从对了中弹出一个元素:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }