声明:如果本文有错误,希望指出。
DelayQueue是一个支持延时获取元素的无界阻塞队列。、队列元素会按照最终执行时间在队列中进行排序。当队列中的元素到达延迟时间时才会被取出。
主要作用:
DelayQueue实现的关键主要有如下几个:
- 可重入锁ReentrantLock
- 用于阻塞和通知的Condition对象
- 根据Delay时间排序的优先级队列:PriorityQueue
- 用于优化阻塞通知的线程元素leader
Delayed
Delayed接口是用来标记那些应该在给定延迟时间之后执行的对象,它定义了一个long getDelay(TimeUnit unit)方法,该方法返回与此对象相关的的剩余时间。同时实现该接口的对象必须定义一个compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
先看下实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class DelayTask implements Delayed { public String name; public Long delayTime; public TimeUnit delayTimeUnit; public Long executeTime;
DelayTask(String name, long delayTime, TimeUnit delayTimeUnit) { this.name = name; this.delayTime = delayTime; this.delayTimeUnit = delayTimeUnit; this.executeTime = System.currentTimeMillis() + delayTimeUnit.toMillis(delayTime); } @Override public long getDelay(TimeUnit unit) { return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } return 0; }
}
|
内部结构
1 2 3 4 5 6 7 8 9 10 11
| public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null; private final Condition available = lock.newCondition(); }
|
offer()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
|
take()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
总结
DelayQueue
的入队、出对过程和其他的阻塞队列没有很大区别,无非是在出对的时候增加了一个到期时间的判断。同时通过leader来减少不必要阻塞。