ScheduledThreadPoolExecutor源码分析

一、引言

对于一般的多任务执行,ThreadPoolExecutor可以满足大部分需求。但是有时候我们需要定时或者延迟地去执行一个任务,这个时候ThreadPoolExecutor已经不能满足我们的需求了,所以Java提供了ScheduledThreadPoolExecutor来执行定时或延迟任务。

二、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以其新增和回收线程逻辑,执行任务方式都沿用了ThreadPoolExecutor的逻辑。ScheduledThreadPoolExecutor之所以能够执行定时任务和延迟任务,主要是其自定义实现了一个DelayQueue并封装了一个ScheduledFutureTask(extend FutureTask)。
1.构造函数
??ScheduledThreadPoolExecutor本质是一个ThreadPoolExecutor,其构造函数直接t通过super来完成对象的初始化。默认ScheduledThreadPoolExecutor的maximumPoolSize为Integer.MAX_VALUE,keepAliveTime=0,任务队列为DelayedWorkQueue。但是由于DelayedWorkQueue是无界队列,所以设置maximumPoolSize是无效的。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

2.ScheduledFutureTask
??ScheduledFutureTask是ScheduledThreadPoolExecutor对任务的封装,其中包含了该任务的类型(period)、下次需要执行的时间(time)以及在任务队列中的位置(heapInex)

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /** Sequence number to break ties FIFO */
    //入队列的序号
    private final long sequenceNumber;

    /** The time the task is enabled to execute in nanoTime units */
    //任务执行的时间
    private long time;

    /**
     * Period in nanoseconds for repeating tasks.  A positive
     * value indicates fixed-rate execution.  A negative value
     * indicates fixed-delay execution.  A value of 0 indicates a
     * non-repeating task.
     */
    //任务类型
    //正数:按固定频率执行
    //0:非重复执行的任务
    // 负数:按固定延迟执行
    private final long period;

    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;

    /**
     * Index into delay queue, to support faster cancellation.
     */
    // 在任务队列数组中的索引
    int heapIndex;

3.DelayedWorkQueue
??DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
排序规则:

  • 执行时间距离当前时间越近,越靠前
  • 如果执行时间相同,则先执行插入时间靠前的任务。

4.新增/获取任务
??DelayedWorkQueue通过put或者add来新增一条任务,但其底层都是调用offer来新增任务的。对于获取任务,我们知道在ThreadPoolExecutor中线程根据getTask来获取任务队列中的任务,而在getTask中任务队列通过poll或者take函数来获取任务队列中的任务,由于ScheduleThreadPoolExecutor继承自ThreadPoolExecutor,因此其底层获取任务方式相同,只需要DelayedWorkQueue提供take及pool方法即可。

在分析offer、take及poll之前,我们先看下siftUp及siftDown函数。

DelayWorkQueue底层是用最小堆数据结构实现的,需要最先执行的任务在堆的顶部,因此在每次插入或者删除任务时需要调整二叉树节点的顺序,但不同于最小堆的地方在于DelayWorkQueue不关心兄弟节点之间的顺序,只要父节点的任务先于子节点执行即可。

在一个最小堆的队列中,假如索引从0开始,子节点索引值为k,父节点索引值为p,则存在如下规律:

  1. 一个节点的左子节点的索引为:k = p * 2 + 1;
  2. 一个节点的右子节点的索引为:k = (p + 1) * 2;
  3. 一个节点的父节点的索引为:p = (k - 1) / 2。

siftUp函数在新增一个任务时调用,通过循环对比父子节点任务执行的先后顺序来调整新任务在堆中的位置。

/**
 * Sifts element added at bottom up to its heap-ordered spot.
 * Call only when holding lock.
 */
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1; //查找到父节点
        RunnableScheduledFuture<?> e = queue[parent];   //获取父节点任务
        if (key.compareTo(e) >= 0)  //如果父节点先于该任务执行,则跳出循环
            break;
        queue[k] = e;   //与父节点交换位置
        setIndex(e, k);
        k = parent; //重新向上追溯父节点
    }
    queue[k] = key; 
    setIndex(key, k);
}

siftDown函数是将一个任务从k节点一层一层地最小堆的底层沉淀,能够保证执行完后最小堆中的父节点任务先于子节点执行。

/**
 * Sifts element added at top down to its heap-ordered spot.
 * Call only when holding lock.
 */
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

5.take

take函数主要是获取任务队列最小堆中的第一个任务,其使用了leader-follower模式,关于leader-follower模式可以参考这篇博客。

leader-follower模式中,所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

  • 获取任务队列堆顶元素,如果为null则进入wail状态,等待offer的signal唤醒
  • 如果堆顶任务执行时间小于当前时间,则返回堆顶任务
  • 如果leader为空,则将当前线程设置为leader,并等待至堆顶任务执行时间
  • 如果leader已存在,则进入wait状态,等待被唤醒。
    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)  //队列中没有任务,需要等待offer函数唤醒
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0) //到任务的执行时间,执行该任务
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null) //当leader线程不为null时说明有leader线程在等待第一个任务,其他线程进入wait状态
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;    //设置为leader线程
                        try {
                          //等待delay时间,
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;  //将leader设置为null并在下一个循环中获取任务
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal(); //唤醒其他线程
            lock.unlock();
        }
    }

6.poll

poll的功能和take相似,入参多了一个timeout,如果在timeout时间内获取不到任务则直接返回null

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    //等待timeout
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                if (nanos <= 0)
                    return null;    //未获得任务,则返回空
                first = null; // don't retain ref while waiting
                //超时时间<延迟时间或者其他线程正在执行任务,则进入等待状态
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //timeLeft = delay-实际等待时间
                        long timeLeft = available.awaitNanos(delay);
                      //计算剩余超时时长
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

7.offer
??offer是DelayQueue底层往任务列表中新增一个任务的函数

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow(); //按1.5倍增长
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;
            available.signal(); //唤醒take或者poll中阻塞的一个线程
        }
    } finally {
        lock.unlock();
    }
    return true;
}

8.新增任务
??ScheduledThreadPoolExecutor支持三种新增任务的方式,新增普通延迟任务,新增固定频率执行任务,新增固定频率执行的延迟任务。

  1. 通过schedule函数直接新增一条延迟任务
  2. 通过scheduleAtFixedRate新增一条按固定频率执行的任务
  3. 通过scheduleWithFixedDelay新增一条固定频率执行的延迟任务

ScheduledThreadPoolExecutor是如何实现定时任务和延迟任务的呢?由上面可知ScheduledThreadPoolExecutor重新封装了task也就是ScheduledFutureTask,而定时和延迟任务的执行就在ScheduledFutureTask的run中完成的。任务下次执行时间:

  • 非周期循环任务,无下次执行时间
  • 定时周期任务:上次执行时间+延迟时间
  • 延迟周期任务:当前时间+延迟时间


    image1.png
    public void run() {
        //判断是否是定时任务
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            //正常任务,直接执行
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            //非定时任务,执行完后设置下次执行的时间
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }
    
    private void setNextRunTime() {
                long p = period;
                //循环任务直接在本次执行时间上加上时间间隔
                if (p > 0)
                    time += p;
                else
                  //延迟定时任务则将当前时间+延迟时间作为下次执行的时间
                    time = triggerTime(-p);
            }
    
    long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容