线程池源码解读

线程池的在 Java并发中使用最多的一种手段,也是性能和易用性相对来说比较均衡的方式,下面我们就一起探索先线程池的原理。

线程池分配线程流程

对于线程池的使用,在这篇文章中就不过多的赘述,首先我们先看下线程池的分配线程的逻辑。

我们知道,在创建线程池的有 7 个核心的参数:

corePoolSize:核心线程数

maximumPoolSize:最大线程数

keepAliveTime:空闲线程存活时间

TimeUnit: 单位

workQueue:阻塞队列

ThreadFactory: 线程工厂

RejectedExecutionHandler: 拒绝策略

在这 7 个参数中,其中我们最重要的几个参数是 corePoolSize,maximumPoolSize,workQueue ,这三个参数来决定线程池主要的线程数和任务队列长度。

具体的流程图如下(图片来自网上,侵删):

image-20220813132419857

构造函数的理解

构造函数的是我们创建线程池的第一步,可以简单的看下,搞清楚内部的变量是如何赋值的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
// 这个是重点
// Timeout in nanoseconds for idle threads waiting for work. Threads use this timeout when there are more than corePoolSize present or if allowCoreThreadTimeOut. Otherwise they wait forever for new work.
// 等待工作的线程池的超时 NS 时间,当线程多于核心线程数据数时候或者 allowCoreThreadTimeOut==true,现成会用此次的线程超时时间。 否则 他们会永远等待
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

几个特殊变量的含义

在阅读代码时候,会有几个变量的障碍,因为设计的过于巧妙,所以看起来稍微有点晦涩。在下面的代码里,已经注释了相关代码的含义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ctl= 11100000 00000000 00000000 00000000|0= RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Interger.SIZE 值是 32 这里的值是 32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 实际是 1<<29-1
// 00100000 00000000 00000000 00000000 - 1
// 00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// -1 << 29
// 11111111 11111111 11111111 11111111 << 29
// 11100000 00000000 00000000 00000000
// 取前 3 位 111
private static final int RUNNING = -1 << COUNT_BITS;
// 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 010
private static final int STOP = 1 << COUNT_BITS;
// 100
private static final int TIDYING = 2 << COUNT_BITS;
// 110
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
// CAPACITY = 00011111 11111111 11111111 11111111
// ~CAPACITY= 11100000 00000000 00000000 00000000
// c & ~CAPACITY
// 因为~CAPACITY 头三位为 111 ,所以&运算都是它本身
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 同理 CAPACITY 也都是本身
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

核心分配线程逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果小于核心线程数,直接addWork,第二个参数 true 代表是核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.
// 如果当前线程池是 RUNNING 状态,且能够 offer 进队列则进行 recheck(为什么要进行 recheck)
// 如果 recheck 线程池不是 RUNNING 状态,且能移除当前 command对象成功,则直接 reject
// - 为了防止 add任务后,线程池调用了 shutdown 方法。
// 否则 判断当前数量为 0 直接 addWorker 一个空的任务。
// isRuning c<SHUTDOWN 只有 runing
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果出现刚入队列,线程池就被 shutdown了,任务就会被移除。
if (! isRunning(recheck) && remove(command))
reject(command);
//这里是什么场景?核心线程数为0了?
// 这就是前面说的我们可以设置核心线程数完成任务后就被销毁,那么核心线程数就为0了,
//那么刚刚队列中的任务怎么执行呢,就需要使用使用创建非核心线程数来执行任务了(可以忽略,因为不会这么设置)
//addWorker 会同时创建任务和线程,这个 addWorker(null,false) ,代表只开线程处理任务,不添加新任务。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 如果添加任务失败,则直接 reject
else if (!addWorker(command, false))
reject(command);
}

addWork 方法

这个是线程池添加任务的核心线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 1. rs >= SHUTDOWN 代表不是 running 状态
// 转换 rs >= SHUTDOWN && ( rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
// 表示当前线程池处于SHUTDOWN 状态后,新增的任务不为空的,直接返回,代表添加任务失败
// 表示当前线程池处于SHUTDOWN 状态后,核心线程数为0,新增非核心线程数来处理任务,但是队列为空,直接返回,代表添加任务失败.

// 这里返回 false 有可能会触发 reject 方法
if (rs >= SHUTDOWN &&
! ( rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()) )
return false;


for (;;) {
// 此处保证,如果是核心线程和非核心线程都会返回 false ,但是如果是核心线程则不需要校验返回值
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 更新线程池的数量 ,更新成功完后会跳出 retry
if (compareAndIncrementWorkerCount(c))
break retry;
// 这里是 CAS 更新失败逻辑
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个 Workder 对象,Worker 继承自 Runabler 对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//这里为什么需要一个 CAS 锁?
//1. 避免 HashSet 不安全 ,锁为什么不加到 workers.add(w); 这里?
//2.
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再次校验线程状态
// 这里为什么再次校验,防止调用了 addWorker 未完成,就直接调用了 shutdown()
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程刚分配就已经在运行了,说明是不合法的流程
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// largestPoolSize 标记当前线程池最大的线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,调用 worker 中的 run 方法
t.start();
workerStarted = true;
}
}
} finally {
// 哪些场景会添加失败?
//1. 上面 recheck 的时候。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Woker 源码解析

worker 继承自 AbstractQueuedSynchronizerRunnable,本质还是一个线程对象

1
2
3
4
5
6
7
8
9
10
11
12
13
final class Worker extends AbstractQueuedSynchronizer implements Runnable  

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

runWorker代码

这个是线程调用了 start 方法,start 方法会调用 run 方法,run 方法会调用 task 中的 run 方法,进而间接的开线程调用了业务方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 这里为什么要先释放锁??
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果获取任务不为空,getTask()方法
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&&!wt.isInterrupted())
wt.interrupt();
try {
// 线程池 目前没有用,WorkerPoolExecutor会记录当前哪些 worker 列表
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用 run 方法,只是方法的调用,不是线程的启动
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 执行完成 ,任务赋值为空,下次循环就直接从队列中拿任务了
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//异常跳出 while 循环,处理动作
processWorkerExit(w, completedAbruptly);
}
}

getTask() 获取任务方法

这个getTask()方法是获取任务的方法,也是线程池线程能够复用的逻辑,在一个 while 循环中,一直拉取队列任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果线程池不是 RUNNING 且 状态> STOP 或者 队列为空
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

//如果允许核心线程数超时,或者是非核心线程 timed 才为 true.
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//如果 (线程数大于最大线程 或者 已经超时)并且(线程数>1||队列为空)
//如果允许核心线程超时且已经超时且队列中任务为空 则直接减少线程数据退出死循环返回空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 释放一个线程
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 调用对应的 aqs 方法,拉取对应的 task
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果出现异常,则将线程池中线程数量-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//完成数++,workers 移除
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
//如果线程池是RUNNING SHUTDOWN .
if (runStateLessThan(c, STOP)) {
//非用户任务异常,也就是手动执行的中断操作
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果队列中还有待执行的任务,那么必须要保证线程池中至少有一个线程,否则就创新一个新的非核心线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 开线程拉取任务处理
addWorker(null, false);
}
}

整体的流程图

(图来自网上,侵删)

img

作者

付威的网络博客

发布于

2022-08-12

更新于

2022-08-12

许可协议

评论