0%

Java阻塞队列 —— DelayQueue

声明:如果本文有错误,希望指出。

DelayQueue是一个支持延时获取元素的无界阻塞队列。、队列元素会按照最终执行时间在队列中进行排序。当队列中的元素到达延迟时间时才会被取出。
主要作用:

  • 缓存:清掉缓存中超时的缓存数据
  • 任务超时处理

DelayQueue实现的关键主要有如下几个:

  • 可重入锁ReentrantLock
  • 用于阻塞和通知的Condition对象
  • 根据Delay时间排序的优先级队列:PriorityQueue
  • 用于优化阻塞通知的线程元素leader

Delayed

Delayed接口是用来标记那些应该在给定延迟时间之后执行的对象,它定义了一个long getDelay(TimeUnit unit)方法,该方法返回与此对象相关的的剩余时间。同时实现该接口的对象必须定义一个compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
先看下实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class DelayTask implements Delayed {
//定义该元素及其属性
public String name;
public Long delayTime;
public TimeUnit delayTimeUnit;
public Long executeTime;//ms

DelayTask(String name, long delayTime, TimeUnit delayTimeUnit) {
this.name = name;
this.delayTime = delayTime;
this.delayTimeUnit = delayTimeUnit;
this.executeTime = System.currentTimeMillis() + delayTimeUnit.toMillis(delayTime);
}
//getDelay方法的作用即是计算当前时间到执行时间之间还有多长时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
//compareTo方法的作用即是判断队列中元素的顺序谁前谁后。当前元素比队列元素后执行时,返回一个正数,比它先执行时返回一个负数,否则返回0
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
}
return 0;
}

}

内部结构

1
2
3
4
5
6
7
8
9
10
11
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
/** 可重入锁 */
private final transient ReentrantLock lock = new ReentrantLock();
/** 支持优先级的BlockingQueue */
private final PriorityQueue<E> q = new PriorityQueue<E>();
/** 用于优化阻塞 */
private Thread leader = null;
/** Condition */
private final Condition available = lock.newCondition();
}

offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 向 PriorityQueue中插入元素
q.offer(e);
// 如果当前元素的对首元素(优先级最高),leader设置为空,唤醒所有等待线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 对首元素
E first = q.peek();
if (first == null)// 对首为空,阻塞,等待off()操作唤醒
available.await();
else {
// 获取对首元素的超时时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)// <=0 表示已过期,出对,return
return q.poll();
first = null; // don't retain ref while waiting
// leader != null 证明有其他线程在操作,阻塞
if (leader != null)
available.await();
else {
// 否则将leader 设置为当前线程,独占
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超时阻塞
available.awaitNanos(delay);
} finally {
// 释放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 唤醒阻塞线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

总结

DelayQueue 的入队、出对过程和其他的阻塞队列没有很大区别,无非是在出对的时候增加了一个到期时间的判断。同时通过leader来减少不必要阻塞。

客官,赏一杯coffee嘛~~~~