一、引言
对于一般的多任务执行,ThreadPoolExecutor可以满足大部分需求。但是有时候我们需要定时或者延迟地去执行一个任务,这个时候ThreadPoolExecutor已经不能满足我们的需求了,所以Java提供了ScheduledThreadPoolExecutor来执行定时或延迟任务。
二、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以其新增和回收线程逻辑,执行任务方式都沿用了ThreadPoolExecutor的逻辑。ScheduledThreadPoolExecutor之所以能够执行定时任务和延迟任务,主要是其自定义实现了一个DelayQueue并封装了一个ScheduledFutureTask(extend FutureTask)。
1.构造函数
??ScheduledThreadPoolExecutor本质是一个ThreadPoolExecutor,其构造函数直接t通过super来完成对象的初始化。默认ScheduledThreadPoolExecutor的maximumPoolSize为Integer.MAX_VALUE,keepAliveTime=0,任务队列为DelayedWorkQueue。但是由于DelayedWorkQueue是无界队列,所以设置maximumPoolSize是无效的。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2.ScheduledFutureTask
??ScheduledFutureTask是ScheduledThreadPoolExecutor对任务的封装,其中包含了该任务的类型(period)、下次需要执行的时间(time)以及在任务队列中的位置(heapInex)
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** Sequence number to break ties FIFO */
//入队列的序号
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
//任务执行的时间
private long time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
//任务类型
//正数:按固定频率执行
//0:非重复执行的任务
// 负数:按固定延迟执行
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
*/
// 在任务队列数组中的索引
int heapIndex;
3.DelayedWorkQueue
??DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
排序规则:
- 执行时间距离当前时间越近,越靠前
- 如果执行时间相同,则先执行插入时间靠前的任务。
4.新增/获取任务
??DelayedWorkQueue通过put或者add来新增一条任务,但其底层都是调用offer来新增任务的。对于获取任务,我们知道在ThreadPoolExecutor中线程根据getTask来获取任务队列中的任务,而在getTask中任务队列通过poll或者take函数来获取任务队列中的任务,由于ScheduleThreadPoolExecutor继承自ThreadPoolExecutor,因此其底层获取任务方式相同,只需要DelayedWorkQueue提供take及pool方法即可。
在分析offer、take及poll之前,我们先看下siftUp及siftDown函数。
DelayWorkQueue底层是用最小堆数据结构实现的,需要最先执行的任务在堆的顶部,因此在每次插入或者删除任务时需要调整二叉树节点的顺序,但不同于最小堆的地方在于DelayWorkQueue不关心兄弟节点之间的顺序,只要父节点的任务先于子节点执行即可。
在一个最小堆的队列中,假如索引从0开始,子节点索引值为k,父节点索引值为p,则存在如下规律:
- 一个节点的左子节点的索引为:k = p * 2 + 1;
- 一个节点的右子节点的索引为:k = (p + 1) * 2;
- 一个节点的父节点的索引为:p = (k - 1) / 2。
siftUp函数在新增一个任务时调用,通过循环对比父子节点任务执行的先后顺序来调整新任务在堆中的位置。
/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1; //查找到父节点
RunnableScheduledFuture<?> e = queue[parent]; //获取父节点任务
if (key.compareTo(e) >= 0) //如果父节点先于该任务执行,则跳出循环
break;
queue[k] = e; //与父节点交换位置
setIndex(e, k);
k = parent; //重新向上追溯父节点
}
queue[k] = key;
setIndex(key, k);
}
siftDown函数是将一个任务从k节点一层一层地最小堆的底层沉淀,能够保证执行完后最小堆中的父节点任务先于子节点执行。
/**
* Sifts element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
5.take
take函数主要是获取任务队列最小堆中的第一个任务,其使用了leader-follower模式,关于leader-follower模式可以参考这篇博客。
leader-follower模式中,所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
- 获取任务队列堆顶元素,如果为null则进入wail状态,等待offer的signal唤醒
- 如果堆顶任务执行时间小于当前时间,则返回堆顶任务
- 如果leader为空,则将当前线程设置为leader,并等待至堆顶任务执行时间
- 如果leader已存在,则进入wait状态,等待被唤醒。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) //队列中没有任务,需要等待offer函数唤醒
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) //到任务的执行时间,执行该任务
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null) //当leader线程不为null时说明有leader线程在等待第一个任务,其他线程进入wait状态
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; //设置为leader线程
try {
//等待delay时间,
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null; //将leader设置为null并在下一个循环中获取任务
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal(); //唤醒其他线程
lock.unlock();
}
}
6.poll
poll的功能和take相似,入参多了一个timeout,如果在timeout时间内获取不到任务则直接返回null
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
//等待timeout
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null; //未获得任务,则返回空
first = null; // don't retain ref while waiting
//超时时间<延迟时间或者其他线程正在执行任务,则进入等待状态
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//timeLeft = delay-实际等待时间
long timeLeft = available.awaitNanos(delay);
//计算剩余超时时长
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
7.offer
??offer是DelayQueue底层往任务列表中新增一个任务的函数
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(); //按1.5倍增长
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal(); //唤醒take或者poll中阻塞的一个线程
}
} finally {
lock.unlock();
}
return true;
}
8.新增任务
??ScheduledThreadPoolExecutor支持三种新增任务的方式,新增普通延迟任务,新增固定频率执行任务,新增固定频率执行的延迟任务。
- 通过schedule函数直接新增一条延迟任务
- 通过scheduleAtFixedRate新增一条按固定频率执行的任务
- 通过scheduleWithFixedDelay新增一条固定频率执行的延迟任务
ScheduledThreadPoolExecutor是如何实现定时任务和延迟任务的呢?由上面可知ScheduledThreadPoolExecutor重新封装了task也就是ScheduledFutureTask,而定时和延迟任务的执行就在ScheduledFutureTask的run中完成的。任务下次执行时间:
- 非周期循环任务,无下次执行时间
- 定时周期任务:上次执行时间+延迟时间
-
延迟周期任务:当前时间+延迟时间
public void run() {
//判断是否是定时任务
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//正常任务,直接执行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//非定时任务,执行完后设置下次执行的时间
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
//循环任务直接在本次执行时间上加上时间间隔
if (p > 0)
time += p;
else
//延迟定时任务则将当前时间+延迟时间作为下次执行的时间
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}