声明:如果本文有错误,希望指出。
ArrayBlockingQueue
是一个数组实现的有界阻塞队列,采用FIFO算法对队列元素进行排序。
1
| ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
|
定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null; }
|
ArrayBlockingQueue继承AbstractQueue,实现BlockingQueue接口。AbstractQueue提供了对queue操作的骨干实现。BlockingQueue继承java.util.Queue为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作,作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由BlockingQueue来完成。
ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作。
入队
add(E e)
add(E e)调用父类AbstractQueue接口,如果添加成功,返回true,否则抛出 IllegalStateException("Queue full")
异常。
1 2 3 4 5 6 7 8 9
| public boolean add(E e) { return super.add(e); } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
|
offer(E e)
add方法最后调用的还是offer进行添加元素的。先判断添加的节点是否为null,然后获取lock,如果队列已经满了,返回false,否则调用enqueue添加元素,最后释放lock。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
|
1 2 3 4 5 6 7 8
| private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
|
出队
poll
1 2 3 4 5 6 7 8 9
| public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
|
dequeue
方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器itrs不为null,则需要维护下该迭代器。最后调用notFull.signal()唤醒入列线程。
1 2 3 4 5 6 7 8 9 10 11 12 13
| private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
|
take
1 2 3 4 5 6 7 8 9 10 11
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|