# ScheduledThreadPoolExecutor

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


Executors工具类除了提供返回线程池实例的功能外, 还支持ScheduledThreadPoolExecutor, 这是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。

# 一、核心结构

ScheduledThreadPoolExecutor 是 Java 提供的定时任务线程池,继承自 ThreadPoolExecutor,实现了 ScheduledExecutorService 接口,支持延迟执行与周期性执行任务。

# 类继承结构(基于模板方法)

Executor
   ↑
ExecutorService
   ↑
   |<---------------------------- ScheduledExecutorService
   |                                     ↑
AbstractExecutorService           (实现) |
   ↑                                     |
ThreadPoolExecutor                     	  |	
   ↑                                     |
ScheduledThreadPoolExecutor ------------┘

1
2
3
4
5
6
7
8
9
10
11
12

特点:继承自 ThreadPoolExecutor,添加了定时调度能力,其关键在于使用了支持延迟调度的队列 DelayedWorkQueue

# 核心调度队列

DelayedWorkQueue

private static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
1
  • 本质是一个**基于最小堆(小顶堆)**实现的无界阻塞队列。
  • 队列中的任务必须实现 Delayed 接口(如 ScheduledFutureTask)。
  • 使用 System.nanoTime() 作为统一时间基准,保证延迟计算的精确性。

核心字段:

private RunnableScheduledFuture<?>[] queue;
private final ReentrantLock lock = new ReentrantLock();
private int size;
1
2
3

特点:只有队首任务到达执行时间,才能从队列中取出。

# 调度任务封装类:

ScheduledFutureTask

class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>
1
  • 封装了实际的执行任务和调度参数;
  • 实现了 Comparable 接口,以支持堆排序;
  • 可设置为一次性、固定频率或固定延迟的重复任务。

关键字段:

private long time;               // 下一次触发时间(纳秒)
private long period;             // 重复任务的周期
private final boolean isPeriodic; // 是否是周期任务
1
2
3

该类继承自FutureTask, FutureTask封装了任务状态

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
1
2
3
4
5
6
7
8

# 线程池运行流程

schedule() / scheduleAtFixedRate() / scheduleWithFixedDelay()
   ↓
封装成 ScheduledFutureTask
   ↓
加入 DelayedWorkQueue
   ↓
Worker 线程从队列中取出到期任务
   ↓
通过线程池中的 execute 执行任务
1
2
3
4
5
6
7
8
9

特点:线程池不会不断轮询队列,而是通过 DelayQueue 的阻塞特性,当到达指定延迟时自动唤醒线程执行。

# 执行控制逻辑

  • 执行线程通过 DelayedWorkQueue.take() 获取任务;
  • 任务到期前会被阻塞;
  • 任务执行完毕后若是周期任务,会重新计算下次执行时间并重新入队。

# 重复调度逻辑(周期任务)

if (isPeriodic()) {
    time += period;
    delayedWorkQueue.add(this); // 重新入队
}
1
2
3
4
  • 对于固定频率任务:period > 0
  • 对于固定延迟任务:period < 0(任务执行完后再计算下次时间)

# 二、源码解析

# schedule(Runnable command, long delay,TimeUnit unit)

该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为 unit 的 delay 时间后开始执行。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {

    // 如果任务为空 或者 时间单位为空 不执行
    if (command == null || unit == null)
        throw new NullPointerException();
    
    // 装饰器对任务进行包装
    RunnableScheduledFuture<Void> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement()));
    
    // 添加任务到延迟队列  
    delayedExecute(t);
    return t;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

首先会将任务通过构造器的方式构造成ScheduledFutureTask

ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) {
    // 调用父类构造器 
    super(r, result);
    this.time = triggerTime;
    // period为0,说明为一次性任务
    this.period = 0;
    this.sequenceNumber = sequenceNumber;
}

public FutureTask(Runnable runnable, V result) {
    // 将任务转为callable类型
    this.callable = Executors.callable(runnable, result);
    // 设置当前任务状态位NEW
    this.state = NEW;       
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

在ScheduledFutureTask中 使用long getDelay(TimeUnit unit) 计算任务还有多久即将过期

public long getDelay(TimeUnit unit) {
    return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
1
2
3

在元素加入延迟队列后, 会使用compareTo方法去调整队列中的元素, 使即将过期的元素排在队首, 代码实现较简单

public int compareTo(Delayed other) {
    if (other == this) 
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

当任务被装饰为ScheduledFutureTask之后, 会调用delayedExecute()将任务加入延迟队列

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池状态为shutdown, 不再接收新的任务
    if (isShutdown())
        reject(task);
    else {
        // 添加任务到延迟队列
        super.getQueue().add(task);
        // 添加完后再次检测线程池状态
        if (!canRunInCurrentRunState(task) && remove(task))
            task.cancel(false);
        else
            // 确保至少一个线程处理任务
            ensurePrestart();
    }
}
// 判断是否需要创建核心线程 是否有线程在执行任务
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

这是向延迟队列添加任务的全流程, 接下来分析任务如何执行, 也就是Worker线程调用具体任务的run(), 此处任务是ScheduledFutureTask();

public void run() {
    // 判断线程池状态, 如果线程池状态无法运行任务, 则取消
    if (!canRunInCurrentRunState(this))
        cancel(false);
    // 如果该任务是非周期性的, 调用父类run()方法
    else if (!isPeriodic())
        super.run();
    // 如果是周期性任务,并且成功执行并重置了任务状态
    else if (super.runAndReset()) {
        // 设置下一次的运行时间
        setNextRunTime();
        // 重新将该任务提交到线程池中等待下一次执行
        reExecutePeriodic(outerTask);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

schedule API是单次任务API, 会执行第一个else if

# scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

该 API 当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay 任务)。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0L)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      -unit.toNanos(delay),
                                      sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

该方法和schedule类似, 都是将任务进行转换, 再加入延迟队列, 需要注意的是此处 period 不再为0, 而是

-unit.toNanos(delay) < 0, 即该任务可重复执行, 该period即下次任务延迟时间

当运行该任务时依旧是通过ScheduledFutureTask.run()方法执行任务, 此时执行第三个if else分支, 首先会重置任务状态

protected boolean runAndReset() {
    // 如果任务状态不是 NEW(尚未启动),或者已有线程在执行该任务,则直接返回 false,不执行
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return false;

    boolean ran = false; // 标记任务是否正常执行完成
    int s = state;

    try {
        Callable<V> c = callable;
        // 如果任务还没被取消,状态仍是 NEW,且 callable 不为 null,就执行
        if (c != null && s == NEW) {
            try {
                c.call(); // 执行任务,不设置返回结果(区别于普通的 run 方法)
                ran = true;
            } catch (Throwable ex) {
                // 如果执行过程中抛出异常,设置异常信息
                setException(ex);
            }
        }
    } finally {
        // 任务执行完毕,清除 RUNNER 记录的当前线程
        runner = null;

        // 如果任务在执行期间被取消并处于中断流程中,则处理中断
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }

    // 如果任务执行成功,且状态仍是 NEW(表示未被取消/未改变状态),返回 true
    // 否则返回 false(表示已取消或失败)
    return ran && s == NEW;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

该方法和普通run()的区别在于, 不设置任务结果, ran == true 表明任务执行完成, s == NEW 表明任务仍处于新状态, 需要再次执行, 当本轮任务执行完后会设置下次任务执行时间以及处理Periodic标识

public void run() {
    if (!canRunInCurrentRunState(this))
        cancel(false);
    else if (!isPeriodic())
        super.run();
    else if (super.runAndReset()) {
		// 设置下次运行时间
        setNextRunTime();
        // 将任务加入延迟队列 确保有一个线程在执行任务
        reExecutePeriodic(outerTask);
    }
}

// 在构建任务时  period = -unit.toNanos(delay), 设置下次任务执行时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(task)) {
        // 延迟队列增加任务
        super.getQueue().add(task);
        // 线程池是可运行的状态确保有线程在处理任务
        if (canRunInCurrentRunState(task) || !remove(task)) {
            ensurePrestart();
            return;
        }
    }
    task.cancel(false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

大致流程: 当添加一个任务到延迟队列后,等待 initialDelay 时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。

# scheduleAtFixedRate(Runnable command,long initialDelay,longperiod,TimeUnit unit)

该API是以固定频率调用任务, 与2.2实现类似, 自行查看源码

主流框架的spring定时任务设计与该设计类似, 但没有直接采取该API