如果你只知道线程池的几个参数,面试被问到再深一点的内容就回答不上来了,那么本系列文章带你熟悉线程池源码,更深入的了解线程池。
ThreadPoolExecutor将分几篇文章做解析,本文是第三篇,第一篇请见面试被问到线程池的状态是怎么计算的,如何回答?,关注一波,不错过后续内容,下面是核心知识点。本篇文章讲解execute()流程,addWorker()分析。
线程池创建后,我们会调用execute()方法来添加任务,另外还有一个submit(),其实底层也是execute()方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
那么execute()方法都做了什么,来看源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// 当前线程数小于核心线程池数量,此次提交任务,直接创建一个新的worker
// 相对应线程池多了一个新的工作线程worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//执行到这说明工作线程大于等于核心线程
// 当前线程池处于running状态,尝试将task放入到workQueue中
if (isRunning(c) && workQueue.offer(command)) {
// 获取当前ctl
int recheck = ctl.get();
// !isRunning()线程不在Running状态,代表当你提交到任务队列后,线程池状态被外部线程
// 修改,例如调用了shutDown(),shutDownNow()
// remove成功,提交之后,线程池中的线程还没消费,调用拒绝策略,将本次提交任务决绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 有几种情况会走到这里?
// 1.当前线程池是RUNNING 状态
// 2.线程池状态是非 RUNNING 状态,但是 remove 提交的任务失败
// 担心当前线程池是 RUNNING 状态,但是线程池中的存活数量是0,这个时候,会很尴尬
//,任务没线程去跑了
// 这里其实是一个担保机制,保证线程池在RUNNING 状态下,最起码得有一个线程在工作
else if (workerCountOf(recheck) == 0)
//添加非核心线程
addWorker(null, false);
}
//执行到这两种情况,队列满了,线程池非Running状态
else if (!addWorker(command, false))
//拒绝此次提交
reject(command);
}
execute()方法代码行数不多,但是里面的逻辑却是挺多的,简要概括下就是:
1.当前工作线程小于核心线程数,添加核心工作线程
2.当前工作线程大于等于核心线程数,且线程池为running状态,尝试添加任务到队列
3.如果线程池状态为running且队列已经满了,则添加非核心线程
4.如果线程池状态非Running状态或者队列已经满了,则执行拒接策略
execute()方法中的addWorker()方法是怎么添加工作线程的呢,请看下面分析:
// 返回值说明:
// true:表示worker 创建成功,且线程启动成功
// false:表示创建失败
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋操作:判断当前线程池状态是否允许创建线程
retry:
for (;;) {
// 获取当前ctl
int c = ctl.get();
// 获取当前线程池运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是RUNNING 状态
// 条件二:前置条件:当前线程池状态不是RUNNING 状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
// 表示:当前线程池状态是SHUTDOWN状态 && 当前提交的任务是null addWorker这个方法可能不是execute去调用的 && 当前任务队列不是空
// 排除掉这种情况,当前线程池状态是SHUTDOWN状态,但是队列里面还有任务尚未处理,这个时候是允许添加worker的,但是不允许再次提交task
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 什么情况下会返回false?
// 线程池状态 rs >= SHUTDOWN
// rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
return false;
// 上面逻辑,就是判断当前线程池状态是否允许添加worker
// 内部自旋操作:获取创建线程令牌的过程
for (;;) {
// 获取当前线程池中的线程数量
int wc = workerCountOf(c);
// 条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多的数字
// 条件二:wc >= (core ? corePoolSize : maximumPoolSize)
// core == true,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制
// core == false,判断当前线程数量是否 >= maximumPoolSize,会拿最大线程数量做限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 执行到这里,说明当前已经无法添加线程了,已经达到指定限制了
return false;
// 条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌
// 条件失败:说明可能有其他线程,修改过ctl这个值了,CAS 发生了冲突
// 可能发生过什么冲突?
// 1.其他线程execute() 申请过令牌了,在这之前,改变了ctl的值,期望值c与内存中的ctl的值不符合,导致CAS 失败
// 2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生了变化了,ctl高三位表示线程池状态
// 线程池状态改变后,CAS 操作也会失败
if (compareAndIncrementWorkerCount(c))
// 进入到这里,一定是CAS 失败,申请到令牌了,跳出外部自旋操作
break retry;
// CAS 失败,没有成功的申请到令牌
// 获取最新的 ctl 值
c = ctl.get(); // Re-read ctl
// 判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown 或者shurdownNow会导致线程池状态发生变化
if (runStateOf(c) != rs)
// 线程池状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// CAS 成功,跳出外部自旋操作来到了这里
// workerStarted:表示当前创建的worker是否已经启动 false:未启动 true:已经启动
boolean workerStarted = false;
// workerAdded:表示创建的worker是否添加到线程池中 false:未添加 true:已经添加
boolean workerAdded = false;
// w:表示后面创建worker 的一个引用
Worker w = null;
try {
// 创建worker
w = new Worker(firstTask);
// 将新创建的worker 节点的线程赋值给t
final Thread t = w.thread;
// 为什么这里还要做 t != null 这个判断?
// 为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现
// 防止程序员自己实现的ThreadFactory 实现类有bug,导致创建出来的Thread为null
if (t != null) {
// 将全局锁的引用保存到mainLock变量中
final ReentrantLock mainLock = this.mainLock;
// 持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操作线程池内部相关的操作都必须持有锁
mainLock.lock();
// 从这里加锁之后,其他线程是无法修改线程池状态的
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取最新的线程池运行状态
int rs = runStateOf(ctl.get());
// 条件一:rs < SHUTDOWN 成立:当前线程池处于RUNNING 状态,最正常的状态
// 条件二: 前置条件:当前线程池状态不是RUNNING 状态
// (rs == SHUTDOWN && firstTask == null):当前状态为SHUTDOWN 状态且firstTask为null
// 其实判断的就是SHUTDOWN 状态下的特殊情况,只不过这里不再判断队列是否为空了
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// t.isAlive() 当前线程start 后,线程isAlive 会返回true
// 防止程序员在ThreadFactory实现类创建线程返回给外部之前,将线程给start了
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将咱们创建的worker 添加到线程池中
workers.add(w);
// 获取最新当前线程池的线程数量
int s = workers.size();
// 条件成立:说明当前线程数量是一个新高,更新lagestPoolSize线程池中的线程最大数量
if (s > largestPoolSize)
largestPoolSize = s;
// 表示线程已经加入到线程池中了
workerAdded = true;
}
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
// 条件成立:说明当前添加worker成功
// 条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败
if (workerAdded) {
// 成功后则将创建的worker启动
t.start();
// 启动标记设置为true
workerStarted = true;
}
}
} finally {
// 条件成立:说明添加当前线程到线程池失败或者启动线程失败,需要做清理工作
// 1.释放令牌
// 2.将当前worker 清理出workers集合
if (! workerStarted)
addWorkerFailed(w);
}
// 返回新创建的线程是否启动
return workerStarted;
}
addWorker()方法比较复杂需要慢慢消化,我们简单总结下:
1.首先addWorker会自旋进行线程池状态的校验。
2.再一层内部自旋操作,获取创建线程令牌的过程,其实就是判断线程数量是否满足要求,能否抢到锁。
3.校验通过后,创建新的工作线程,加锁,再进行状态校验,校验通过后启动工作线程。
4.校验失败,添加工作线程失败,释放令牌,将当前worker 清理出workers集合。
总结一下,本文主要讲解execute()流程,addWorker()流程,是线程池的核心内容,各位需要慢慢消化下,,后续文章会接着线程池的讲解其他内容。
如果你觉得此文对你有一丁点帮助,点个赞,关个注,不失联,还希望补充什么内容,大家在评论区留言,期待后续精彩内容。
微信搜索关注订阅号:马老司,学习更多技术点
页面更新:2024-03-23
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号