Android小知识-定时任务ScheduledThreadPoolExecutor

本平台的文章更新会有延迟,大家可以关注微信公众号-顾林海,包括年底前会更新kotlin由浅入深系列教程,目前计划在微信公众号进行首发,如果大家想获取最新教程,请关注微信公众号,谢谢!

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,而ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务,ScheduledThreadPoolExecutor是一个实现定时任务的类,可以在给定的延迟后运行命令,或者定期执行命令。

ScheduledThreadPoolExecutor定义了四个构造函数,这四个构造函数如下:

/**
* @param corePoolSize 核心线程池的大小
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue());
}

/**
* @param corePoolSize  核心线程池的大小
* @param threadFactory 用于设置创建线程的工厂
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue(), threadFactory);
}

/**
* @param corePoolSize 核心线程池的大小
* @param handler      饱和策略
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue(), handler);
}

/**
* @param corePoolSize  核心线程池的大小
* @param threadFactory 用于设置创建线程的工厂
* @param handler       饱和策略
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue(), threadFactory, handler);
}

通过源码可以发现,ScheduledThreadPoolExecutor的构造器都是调用父类的构造器也就是ThreadPoolExecutor的构造器,以此来创建一个线程池。

ThreadPoolExecutor的构造器如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}


public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

创建一个线程池时需要输入几个参数,如下:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其它空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建,会把到达的任务放到缓存队列当中。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程,或调用线程池的prestartCoreThread()方法,线程池会提前创建一个线程。

  • maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

  • KeepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存货的时间。如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提供线程的利用率。

  • unit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)、千分之一毫秒和纳秒(NANOSECONDS、千分之一微妙)。

  • workQueue(任务队列):用于保持等待执行的任务的阻塞队列,可以选择以下几个阻塞队列。
    (1)ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
    (2)LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。
    (3)SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
    (4)PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了4种策略(也可通过实现RejectedExecutionHandler接口自定义策略)。
    (1)AbortPolicy:直接抛出异常。
    (2)CallerRunsPolicy:只用调用者所在线程来运行任务。
    (3)DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    (4)DiscardPolicy:处理,丢弃掉。

在ScheduledThreadPoolExecutor构造器中使用了工作队列java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,DelayedWorkQueue是一个无界的BlockingQueue,
用于放置实现了Delayed接口的对象,其中的对象只能在其到期才能从队列中取走。

由于ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此它也实现了ThreadPoolExecutor的方法,如下:

    public void execute(Runnable command) {
        ...
    }

    public Future<?> submit(Runnable task) {
        ...
    }

    public <T> Future<T> submit(Runnable task, T result) {
        ...
    }

    public <T> Future<T> submit(Callable<T> task) {
        ...
    }

同时它也有自己的定时执行任务的方法:

 /**
  * 延迟delay时间后开始执行task,无法获取task的执行结果。
  */
 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
     ...
 }

 /**
  * 延迟delay时间后开始执行callable,它接收的是一个Callable实例,
  * 此方法会返回一个ScheduleFuture对象,通过ScheduleFuture我们
  * 可以取消一个未执行的task,也可以获得这个task的执行结果。
  */
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
    ...
 }

 /**
  * 延迟initialDelay时间后开始执行command,并且按照period时间周期性
  * 重复调用,当任务执行时间大于间隔时间时,之后的任务都会延迟,此时与
  * Timer中的schedule方法类似。
  */
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    ...
 }


/**
 *延迟initialDelay时间后开始执行command,并且按照period时间周期性重复
 *调用,这里的间隔时间delay是等上一个任务完全执行完毕才开始计算。
 */
 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
    ...
 }

ScheduledThreadPoolExecutor把待调度的任务放到一个DelayedWorkQueue ,并且DelayedWorkQueue 是一个无界队列,ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义。整个ScheduledThreadPoolExecutor的运行可以分为两大部分。

1、当调用ScheduledThreadPoolExecutor的上面4个方法时,会向ScheduledThreadPoolExecutor的DelayedWorkQueue 添加一个实现了RunnableScheduleFuture接口的ScheduledFutureTask,如下ScheduledThreadPoolExecutor其中的一个schedule方法。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit),
                                   sequencer.getAndIncrement()));
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);//向ScheduledThreadPoolExecutor的DelayedWorkQueue添加一个实现了RunnableScheduleFuture接口的ScheduledFutureTask
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

2、线程池中的线程从DelayedWorkQueue 中获取ScheduledFutureTask,然后执行。

ScheduledFutureTask是ScheduledThreadPoolExecutor的内部类并继承自FutureTask,包含3个成员变量。

//ong型成员变量sequenceNumber,表示这个任务被添加到
//ScheduledThreadPoolExecutor中的序号。
private final long sequenceNumber;

//long型成员变量time,表示这个任务将要被执行的具体时间。
private volatile long time;

//long型成员变量period,表示任务执行的间隔周期。
private final long period;


ScheduledFutureTask内部实现了compareTo()方法,用于对task的排序

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

排序时,time小的排在前面,如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面。

DelayedWorkQueue 内部使用了二叉堆算法,DelayedWorkQueue 中的元素第一个元素永远是 延迟时间最小的那个元素。当执行 schedule 方法是。如果不是重复的任务,那任务从 DelayedWorkQueue 取出之后执行完了就结束了。如果是重复的任务,那在执行结束前会重置执行时间并将自己重新加入到 DelayedWorkQueue 中

总结来说,ScheduledThreadPoolExecutor是一个实现ScheduledExecutorService的可以调度任务的执行框架;DelayedWorkQueue是一个数组实现的阻塞队列,根据任务所提供的时间参数来调整位置,实际上就是个小根堆(优先队列);ScheduledFutureTask包含任务单元,存有时间、周期、外部任务、堆下标等调度过程中必须用到的参数,被工作线程执行。ScheduledThreadPoolExecutor与Timer都是用作定时任务,它们直接的差异是Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,不会导致这个问题。Timer使用的是单线程来处理任务,长时间运行的任务会导致其他任务的延迟处理;而ScheduledThreadPoolExecutor可以自定义线程数量。并且Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃;而ScheduledThreadPoolExecutor对运行时异常做了捕获(通过afterExecute()回调方法中进行处理),所以更安全。


838794-506ddad529df4cd4.webp.jpg
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容