# 线程池
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 一、为什么使用线程池?
线程池主要解决两个问题:
- 提高性能,减少资源浪费
- 如果每次执行异步任务都新建一个线程,线程的创建与销毁会带来频繁的开销。
- 使用线程池可以复用已有线程,减少资源浪费,提高系统吞吐量。
- 控制线程资源,方便管理 线程池可以限制线程数量,防止系统因为过多线程而崩溃;还能动态调整线程数,提升灵活性。同时,线程池还会统计任务执行情况,方便监控和调优。
此外,Java 提供了丰富的线程池实现和工厂方法,比如:
newCachedThreadPool():线程数最多可达Integer.MAX_VALUE,空闲线程会自动回收,适合执行大量短期异步任务。newFixedThreadPool(int n):固定大小线程池,适合负载稳定的场景。newSingleThreadExecutor():单线程顺序执行任务,适合需要顺序处理的场景。
# 二、创建方式
推荐:使用 ThreadPoolExecutor 构造器自定义线程池(更灵活)
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.SECONDS,
workQueue, threadFactory, handler);
1
2
3
2
3
不推荐:使用 Executors 工具类(隐藏风险)
Executors.newCachedThreadPool(); // 可无限扩容,可能 OOM
Executors.newFixedThreadPool(int n); // 固定线程数
Executors.newSingleThreadExecutor(); // 单线程,顺序执行
1
2
3
2
3
# 三、核心结构
线程池的核心类是 ThreadPoolExecutor,它实现了 ExecutorService 接口,采用模板方法模式:
Executor
↑
ExecutorService
↑
AbstractExecutorService
↑
ThreadPoolExecutor
1
2
3
4
5
6
7
2
3
4
5
6
7
# 状态管理(ctl 控制变量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
1
ctl 是个高性能原子整型,用来存储
- 高 3 位:线程池运行状态(如 RUNNING、SHUTDOWN、TERMINATED)
- 低 29 位:当前线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 线程池运行状态(高 3 位)
private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务 + 处理队列
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,处理队列
private static final int STOP = 1 << COUNT_BITS; // 不接收 + 不处理,打断线程
private static final int TIDYING = 2 << COUNT_BITS; // 线程池结束前过渡状态
private static final int TERMINATED = 3 << COUNT_BITS; // 完全终止
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 关键成员变量
private final BlockingQueue<Runnable> workQueue; // 等待队列
private volatile long keepAliveTime; // 非核心线程存活时间
private volatile boolean allowCoreThreadTimeOut; // 是否允许核心线程超时
private volatile int corePoolSize; // 核心线程数
private volatile int maximumPoolSize; // 最大线程数
private volatile ThreadFactory threadFactory; // 线程工厂
private volatile RejectedExecutionHandler handler; // 拒绝策略
1
2
3
4
5
6
7
2
3
4
5
6
7
# Worker 内部工作原理
Worker是线程池中真正执行任务的线程,继承了AQS和Runnable。- 它实现了简单的不可重入独占锁,避免线程启动前被中断。
class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 真正执行任务的线程
Runnable firstTask; // 第一个要执行的任务
}
1
2
3
4
2
3
4
状态说明:
state = -1:线程尚未启动,避免 runWorker 被打断;state = 0:未加锁;state = 1:已加锁。
# mainLock 与 termination
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
1
2
2
mainLock用于控制线程池状态修改的并发访问;termination是条件变量,在awaitTermination()阻塞线程,在池完全终止后由signalAll()唤醒。
# 四、源码分析
# execute(Runnable command):提交任务的入口
execute() 方法是线程池对外暴露的任务提交接口。其核心思想是 “生产者-消费者模型”:
- 用户提交任务 → 生产者
- 线程执行任务或从阻塞队列中取任务 → 消费者
public void execute(Runnable command) {
// 如果任务为null, 抛出NPE
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前线程数小于核心线程数,尝试创建核心线程处理任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get(); // 更新状态
}
// 如果线程池在运行状态,且工作队列未满,则尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次状态检查, 防止在加入队列过程中线程池状态变更
// 当前线程池非 running 状态 移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 如果线程池没有线程, 防止任务饥饿,补一个线程
}
// 如果队列满,会尝试新增线程, 因为核心线程数可能<最大线程数, 如果新增失败 则执行决绝策略
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# addWorker(Runnable firstTask, boolean core):创建工作线程
该方法用于 添加新的工作线程,第二个参数 core 表示是否为核心线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 检测线程池状态, 如果状态 >= SHUTDOWN,且不满足允许创建线程的条件,直接失败
// 线程池任务不能为空
// 工作队列不能为null
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP)
|| firstTask != null || workQueue.isEmpty()))
return false;
for (;;) {
// 线程数 < (是否是核心线程 ? 核心线程数 : 线程池最大线程数)
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// CAS增加线程个数,保证同时只有一个线程成功
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果CAS失败, 检测线程池状态, 未发生变化则重复执行CAS, 发生变化则从外层循环开始执行
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
// CAS 成功, 线程数+1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加独占锁, 为了实现workers同步, 可能有多个线程调用了execute方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新检查线程池状态, 避免在获取锁前调用shutdown接口
int c = ctl.get();
// RUNNING 状态 或者 SHUTDOWN 状态 + firstTask 为 null(表示是从队列取任务的线程)
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
// 如果线程已启动则抛异常(按理说不应发生) 正在创建阶段的工作线程
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加到工作线程集合
workers.add(w);
int s = workers.size();
// 记录线程池曾经达到的最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,则启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果启动失败, 进行清理
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
解析:
- 外层循环负责检查线程池状态和容量
- 内层通过 CAS 原子操作修改线程数
- 成功后创建 Worker,并加锁保护共享集合
workers的修改 - 最后启动线程,或在失败时进行清理
# Worker 执行逻辑分析
Worker 是线程池中的实际工作线程,其实现了 Runnable 接口:
Worker(Runnable firstTask) {
setState(-1); // 当工作线程在执行时, 禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 执行任务主逻辑
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# runWorker(Worker w):工作线程执行核心
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 获取当前线程
Runnable task = w.firstTask; // 获取 Worker 初始化时传入的首个任务
w.firstTask = null; // 首个任务只用一次,置为 null 释放引用
w.unlock(); // 允许其他线程获取 Worker 的锁(防止竞争)
boolean completedAbruptly = true; // 是否异常退出,默认 true
try {
// 主任务循环:只要任务存在或从队列中取到新任务,就持续执行
while (task != null || (task = getTask()) != null) {
w.lock(); // 获取 Worker 内部的独占锁,保护执行任务过程中的状态
// 如果线程池状态是 STOP,或当前线程被中断 && 状态 >= STOP,但还没打断线程,则主动中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 避免线程在 STOP 状态还继续执行任务
try {
beforeExecute(wt, task); // 钩子方法,任务执行前的扩展点
try {
task.run(); // 执行任务
afterExecute(task, null); // 钩子方法,任务执行后的扩展点(无异常)
} catch (Throwable ex) {
afterExecute(task, ex); // 钩子方法,有异常的情况
throw ex;
}
} finally {
task = null; // 当前任务引用置空,准备进入下一个循环
w.completedTasks++; // 当前 Worker 完成的任务数 +1
w.unlock(); // 释放 Worker 锁,其他线程可操作
}
}
completedAbruptly = false; // 能正常退出循环,说明没有异常
} finally {
// 无论是否异常退出,都要做资源清理、线程退出处理
processWorkerExit(w, completedAbruptly);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
初始化线程Worker --> 获取firstTask --> 解锁Worker(允许抢锁) -->
--> 循环:
有任务可执行?
--> 获取任务(含首个和队列中)
--> 加锁 --> 判断是否需中断 --> beforeExecute -->
--> 执行任务 --> afterExecute -->
--> 解锁 --> 重复
否 --> 退出循环 --> processWorkerExit()
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# shutdown():优雅关闭线程池
该方法会 停止接收新任务,但会 继续执行队列中已有任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查, 判断是否有权限 关闭线程 中断线程
checkShutdownAccess();
// 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回(SHUTDOWN状态表示不再接受新任务)
advanceRunState(SHUTDOWN);
// 设置中断标志, 只中断空闲线程
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试将状态变为TERMINATED, 循环判断所有的线程是否都执行完 执行完则更改线程池状态
tryTerminate();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# shutdownNow():立即关闭线程池
这个方法执行后, 线程池就不会再接受新的任务了,工作队列的任务全都丢弃, 执行中的线程也会被中断。
# awaitTermination(long timeout, TimeUnit unit):阻塞等待终止
该方法用于 阻塞主线程,直到线程池状态变为 TERMINATED 或超时。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 1. 将 timeout 转换成纳秒单位
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 2. 加锁,确保线程安全
try {
// 3. 只要当前线程池状态还没到 TERMINATED,就一直等
while (runStateLessThan(ctl.get(), TERMINATED)) {
// 超时了还没终止,则返回 false
if (nanos <= 0L)
return false;
// 4. 调用条件变量 termination 的 awaitNanos 进入等待,同时更新剩余等待时间
nanos = termination.awaitNanos(nanos);
}
// 5. 跳出 while 说明线程池已终止,返回 true
return true;
} finally {
mainLock.unlock(); // 6. 解锁
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 五、总结
线程池巧妙地使用一个 Integer 类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个 Worker 线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。