# 线程池

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


# 一、为什么使用线程池?

线程池主要解决两个问题:

  1. 提高性能,减少资源浪费
    • 如果每次执行异步任务都新建一个线程,线程的创建与销毁会带来频繁的开销。
    • 使用线程池可以复用已有线程,减少资源浪费,提高系统吞吐量。
  2. 控制线程资源,方便管理 线程池可以限制线程数量,防止系统因为过多线程而崩溃;还能动态调整线程数,提升灵活性。同时,线程池还会统计任务执行情况,方便监控和调优。

此外,Java 提供了丰富的线程池实现和工厂方法,比如:

  • newCachedThreadPool():线程数最多可达 Integer.MAX_VALUE,空闲线程会自动回收,适合执行大量短期异步任务。
  • newFixedThreadPool(int n):固定大小线程池,适合负载稳定的场景。
  • newSingleThreadExecutor():单线程顺序执行任务,适合需要顺序处理的场景。

# 二、创建方式

推荐:使用 ThreadPoolExecutor 构造器自定义线程池(更灵活)

new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                       keepAliveTime, TimeUnit.SECONDS,
                       workQueue, threadFactory, handler);
1
2
3

不推荐:使用 Executors 工具类(隐藏风险)

Executors.newCachedThreadPool();     // 可无限扩容,可能 OOM
Executors.newFixedThreadPool(int n); // 固定线程数
Executors.newSingleThreadExecutor(); // 单线程,顺序执行
1
2
3

# 三、核心结构

线程池的核心类是 ThreadPoolExecutor,它实现了 ExecutorService 接口,采用模板方法模式

Executor
  ↑
ExecutorService
  ↑
AbstractExecutorService
  ↑
ThreadPoolExecutor
1
2
3
4
5
6
7

# 状态管理(ctl 控制变量)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
1

ctl 是个高性能原子整型,用来存储

  • 高 3 位:线程池运行状态(如 RUNNING、SHUTDOWN、TERMINATED)
  • 低 29 位:当前线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 线程池运行状态(高 3 位)
private static final int RUNNING    = -1 << COUNT_BITS; // 接收新任务 + 处理队列
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 不接收新任务,处理队列
private static final int STOP       =  1 << COUNT_BITS; // 不接收 + 不处理,打断线程
private static final int TIDYING    =  2 << COUNT_BITS; // 线程池结束前过渡状态
private static final int TERMINATED =  3 << COUNT_BITS; // 完全终止
1
2
3
4
5
6
7
8
9

# 关键成员变量

private final BlockingQueue<Runnable> workQueue;        // 等待队列
private volatile long keepAliveTime;                    // 非核心线程存活时间
private volatile boolean allowCoreThreadTimeOut;        // 是否允许核心线程超时
private volatile int corePoolSize;                      // 核心线程数
private volatile int maximumPoolSize;                   // 最大线程数
private volatile ThreadFactory threadFactory;           // 线程工厂
private volatile RejectedExecutionHandler handler;      // 拒绝策略
1
2
3
4
5
6
7

# Worker 内部工作原理

  • Worker 是线程池中真正执行任务的线程,继承了 AQSRunnable
  • 它实现了简单的不可重入独占锁,避免线程启动前被中断。
class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;          // 真正执行任务的线程
    Runnable firstTask;           // 第一个要执行的任务
}
1
2
3
4

状态说明:

  • state = -1:线程尚未启动,避免 runWorker 被打断;
  • state = 0:未加锁;
  • state = 1:已加锁。

# mainLock 与 termination

private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
1
2
  • mainLock 用于控制线程池状态修改的并发访问;
  • termination 是条件变量,在 awaitTermination() 阻塞线程,在池完全终止后由 signalAll() 唤醒。

# 四、源码分析

# execute(Runnable command):提交任务的入口

execute() 方法是线程池对外暴露的任务提交接口。其核心思想是 “生产者-消费者模型”

  • 用户提交任务 → 生产者
  • 线程执行任务或从阻塞队列中取任务 → 消费者
public void execute(Runnable command) {
    // 如果任务为null, 抛出NPE
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    
    // 当前线程数小于核心线程数,尝试创建核心线程处理任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get(); // 更新状态
    }
    
    // 如果线程池在运行状态,且工作队列未满,则尝试将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        
        int recheck = ctl.get(); 
        // 二次状态检查, 防止在加入队列过程中线程池状态变更
        // 当前线程池非 running 状态 移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 如果线程池没有线程, 防止任务饥饿,补一个线程
    }
    // 如果队列满,会尝试新增线程, 因为核心线程数可能<最大线程数, 如果新增失败 则执行决绝策略
    else if (!addWorker(command, false))
        reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# addWorker(Runnable firstTask, boolean core):创建工作线程

该方法用于 添加新的工作线程,第二个参数 core 表示是否为核心线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // 检测线程池状态,  如果状态 >= SHUTDOWN,且不满足允许创建线程的条件,直接失败
        // 线程池任务不能为空
        // 工作队列不能为null
        if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP)
                || firstTask != null || workQueue.isEmpty()))
            return false;

        for (;;) {
            // 线程数 < (是否是核心线程 ? 核心线程数 : 线程池最大线程数)
            if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // CAS增加线程个数,保证同时只有一个线程成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果CAS失败, 检测线程池状态, 未发生变化则重复执行CAS, 发生变化则从外层循环开始执行
            c = ctl.get();  
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;

        }
    }

    // CAS 成功, 线程数+1
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 加独占锁, 为了实现workers同步, 可能有多个线程调用了execute方法
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
               // 重新检查线程池状态, 避免在获取锁前调用shutdown接口
                int c = ctl.get();
				// RUNNING 状态 或者 SHUTDOWN 状态 + firstTask 为 null(表示是从队列取任务的线程)
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 如果线程已启动则抛异常(按理说不应发生) 正在创建阶段的工作线程
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 添加到工作线程集合
                    workers.add(w);
                    int s = workers.size();
                     // 记录线程池曾经达到的最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果添加成功,则启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果启动失败, 进行清理
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

解析:

  • 外层循环负责检查线程池状态和容量
  • 内层通过 CAS 原子操作修改线程数
  • 成功后创建 Worker,并加锁保护共享集合 workers 的修改
  • 最后启动线程,或在失败时进行清理

# Worker 执行逻辑分析

Worker 是线程池中的实际工作线程,其实现了 Runnable 接口:

Worker(Runnable firstTask) {
    setState(-1); // 当工作线程在执行时, 禁止中断
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this); // 执行任务主逻辑
}
1
2
3
4
5
6
7
8
9

# runWorker(Worker w):工作线程执行核心

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 获取当前线程
    Runnable task = w.firstTask;       // 获取 Worker 初始化时传入的首个任务
    w.firstTask = null;                // 首个任务只用一次,置为 null 释放引用
    w.unlock();                        // 允许其他线程获取 Worker 的锁(防止竞争)
    
    boolean completedAbruptly = true;  // 是否异常退出,默认 true
    try {
        // 主任务循环:只要任务存在或从队列中取到新任务,就持续执行
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取 Worker 内部的独占锁,保护执行任务过程中的状态

            // 如果线程池状态是 STOP,或当前线程被中断 && 状态 >= STOP,但还没打断线程,则主动中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); // 避免线程在 STOP 状态还继续执行任务

            try {
                beforeExecute(wt, task);   // 钩子方法,任务执行前的扩展点
                try {
                    task.run();           // 执行任务
                    afterExecute(task, null); // 钩子方法,任务执行后的扩展点(无异常)
                } catch (Throwable ex) {
                    afterExecute(task, ex);   // 钩子方法,有异常的情况
                    throw ex;                
                }
            } finally {
                task = null;           // 当前任务引用置空,准备进入下一个循环
                w.completedTasks++;    // 当前 Worker 完成的任务数 +1
                w.unlock();            // 释放 Worker 锁,其他线程可操作
            }
        }
        completedAbruptly = false;     // 能正常退出循环,说明没有异常
    } finally {
        // 无论是否异常退出,都要做资源清理、线程退出处理
        processWorkerExit(w, completedAbruptly);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
初始化线程Worker --> 获取firstTask --> 解锁Worker(允许抢锁) -->
  --> 循环:
      有任务可执行?
        --> 获取任务(含首个和队列中)
        --> 加锁 --> 判断是否需中断 --> beforeExecute -->
            --> 执行任务 --> afterExecute -->
        --> 解锁 --> 重复
      否 --> 退出循环 --> processWorkerExit()
1
2
3
4
5
6
7
8

# shutdown():优雅关闭线程池

该方法会 停止接收新任务,但会 继续执行队列中已有任务

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限检查, 判断是否有权限 关闭线程 中断线程
        checkShutdownAccess();
        // 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回(SHUTDOWN状态表示不再接受新任务)
        advanceRunState(SHUTDOWN);
        // 设置中断标志, 只中断空闲线程
        interruptIdleWorkers();
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    // 尝试将状态变为TERMINATED, 循环判断所有的线程是否都执行完 执行完则更改线程池状态
    tryTerminate();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# shutdownNow():立即关闭线程池

这个方法执行后, 线程池就不会再接受新的任务了,工作队列的任务全都丢弃, 执行中的线程也会被中断。

# awaitTermination(long timeout, TimeUnit unit):阻塞等待终止

该方法用于 阻塞主线程,直到线程池状态变为 TERMINATED 或超时。

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    // 1. 将 timeout 转换成纳秒单位
    long nanos = unit.toNanos(timeout);

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); // 2. 加锁,确保线程安全

    try {
        // 3. 只要当前线程池状态还没到 TERMINATED,就一直等
        while (runStateLessThan(ctl.get(), TERMINATED)) {
            // 超时了还没终止,则返回 false
            if (nanos <= 0L)
                return false;

            // 4. 调用条件变量 termination 的 awaitNanos 进入等待,同时更新剩余等待时间
            nanos = termination.awaitNanos(nanos);
        }

        // 5. 跳出 while 说明线程池已终止,返回 true
        return true;
    } finally {
        mainLock.unlock(); // 6. 解锁
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 五、总结

线程池巧妙地使用一个 Integer 类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个 Worker 线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。