面试被问到线程池的执行流程,如何回答?

如果你只知道线程池的几个参数,面试被问到再深一点的内容就回答不上来了,那么本系列文章带你熟悉线程池源码,更深入的了解线程池。

ThreadPoolExecutor将分几篇文章做解析,本文是第三篇,第一篇请见面试被问到线程池的状态是怎么计算的,如何回答?,关注一波,不错过后续内容,下面是核心知识点。本篇文章讲解execute()流程,addWorker()分析。

一、execute()方法执行流程

线程池创建后,我们会调用execute()方法来添加任务,另外还有一个submit(),其实底层也是execute()方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

那么execute()方法都做了什么,来看源码分析:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   // 获取ctl的值
    int c = ctl.get();
  // 当前线程数小于核心线程池数量,此次提交任务,直接创建一个新的worker
   // 相对应线程池多了一个新的工作线程worker
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
  //执行到这说明工作线程大于等于核心线程
  
  // 当前线程池处于running状态,尝试将task放入到workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
      // 获取当前ctl
        int recheck = ctl.get();
       // !isRunning()线程不在Running状态,代表当你提交到任务队列后,线程池状态被外部线程
      // 修改,例如调用了shutDown(),shutDownNow()
       // remove成功,提交之后,线程池中的线程还没消费,调用拒绝策略,将本次提交任务决绝
        if (! isRunning(recheck) && remove(command))
            reject(command);
      
       // 有几种情况会走到这里?
         // 1.当前线程池是RUNNING 状态
         // 2.线程池状态是非 RUNNING 状态,但是 remove 提交的任务失败
         // 担心当前线程池是 RUNNING 状态,但是线程池中的存活数量是0,这个时候,会很尴尬
       //,任务没线程去跑了
         // 这里其实是一个担保机制,保证线程池在RUNNING 状态下,最起码得有一个线程在工作
        else if (workerCountOf(recheck) == 0)
          //添加非核心线程
            addWorker(null, false);
    }
  
  //执行到这两种情况,队列满了,线程池非Running状态
    else if (!addWorker(command, false))
      //拒绝此次提交
        reject(command);
}

execute()方法代码行数不多,但是里面的逻辑却是挺多的,简要概括下就是:

1.当前工作线程小于核心线程数,添加核心工作线程

2.当前工作线程大于等于核心线程数,且线程池为running状态,尝试添加任务到队列

3.如果线程池状态为running且队列已经满了,则添加非核心线程

4.如果线程池状态非Running状态或者队列已经满了,则执行拒接策略

execute()方法中的addWorker()方法是怎么添加工作线程的呢,请看下面分析:

二、addWorker()方法分析:

 // 返回值说明:
 // true:表示worker 创建成功,且线程启动成功
 // false:表示创建失败
 private boolean addWorker(Runnable firstTask, boolean core) {
     // 自旋操作:判断当前线程池状态是否允许创建线程
     retry:
     for (;;) {
         // 获取当前ctl
         int c = ctl.get();
         // 获取当前线程池运行状态
         int rs = runStateOf(c);
 
         // Check if queue empty only if necessary.
         // 条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是RUNNING 状态
         // 条件二:前置条件:当前线程池状态不是RUNNING 状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
         // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
         // 表示:当前线程池状态是SHUTDOWN状态 && 当前提交的任务是null addWorker这个方法可能不是execute去调用的 && 当前任务队列不是空
         // 排除掉这种情况,当前线程池状态是SHUTDOWN状态,但是队列里面还有任务尚未处理,这个时候是允许添加worker的,但是不允许再次提交task
         if (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
             // 什么情况下会返回false?
             // 线程池状态 rs >= SHUTDOWN
             // rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
             return false;
 
         // 上面逻辑,就是判断当前线程池状态是否允许添加worker
 
         // 内部自旋操作:获取创建线程令牌的过程
         for (;;) {
             // 获取当前线程池中的线程数量
             int wc = workerCountOf(c);
             // 条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多的数字
             // 条件二:wc >= (core ? corePoolSize : maximumPoolSize)
             // core == true,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制
             // core == false,判断当前线程数量是否 >= maximumPoolSize,会拿最大线程数量做限制
             if (wc >= CAPACITY ||
                 wc >= (core ? corePoolSize : maximumPoolSize))
                 // 执行到这里,说明当前已经无法添加线程了,已经达到指定限制了
                 return false;
 
             // 条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌
             // 条件失败:说明可能有其他线程,修改过ctl这个值了,CAS 发生了冲突
             // 可能发生过什么冲突?
             // 1.其他线程execute() 申请过令牌了,在这之前,改变了ctl的值,期望值c与内存中的ctl的值不符合,导致CAS 失败
             // 2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生了变化了,ctl高三位表示线程池状态
             // 线程池状态改变后,CAS 操作也会失败
             if (compareAndIncrementWorkerCount(c))
                 // 进入到这里,一定是CAS 失败,申请到令牌了,跳出外部自旋操作
                 break retry;
             // CAS 失败,没有成功的申请到令牌
             // 获取最新的 ctl 值
             c = ctl.get();  // Re-read ctl
             // 判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown 或者shurdownNow会导致线程池状态发生变化
             if (runStateOf(c) != rs)
                 // 线程池状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程
                 continue retry;
             // else CAS failed due to workerCount change; retry inner loop
         }
     }
 
     // CAS 成功,跳出外部自旋操作来到了这里
 
     // workerStarted:表示当前创建的worker是否已经启动 false:未启动 true:已经启动
     boolean workerStarted = false;
     // workerAdded:表示创建的worker是否添加到线程池中 false:未添加 true:已经添加
     boolean workerAdded = false;
     // w:表示后面创建worker 的一个引用
     Worker w = null;
     try {
         // 创建worker
         w = new Worker(firstTask);
         // 将新创建的worker 节点的线程赋值给t
         final Thread t = w.thread;
         // 为什么这里还要做 t != null 这个判断?
         // 为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现
         // 防止程序员自己实现的ThreadFactory 实现类有bug,导致创建出来的Thread为null
         if (t != null) {
             // 将全局锁的引用保存到mainLock变量中
             final ReentrantLock mainLock = this.mainLock;
             // 持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操作线程池内部相关的操作都必须持有锁
             mainLock.lock();
 
             // 从这里加锁之后,其他线程是无法修改线程池状态的
             try {
                 // Recheck while holding lock.
                 // Back out on ThreadFactory failure or if
                 // shut down before lock acquired.
                 // 获取最新的线程池运行状态
                 int rs = runStateOf(ctl.get());
 
                 // 条件一:rs < SHUTDOWN 成立:当前线程池处于RUNNING 状态,最正常的状态
                 // 条件二: 前置条件:当前线程池状态不是RUNNING 状态
                 // (rs == SHUTDOWN && firstTask == null):当前状态为SHUTDOWN 状态且firstTask为null
                 // 其实判断的就是SHUTDOWN 状态下的特殊情况,只不过这里不再判断队列是否为空了
                 if (rs < SHUTDOWN ||
                     (rs == SHUTDOWN && firstTask == null)) {
                     // t.isAlive() 当前线程start 后,线程isAlive 会返回true
                     // 防止程序员在ThreadFactory实现类创建线程返回给外部之前,将线程给start了
                     if (t.isAlive()) // precheck that t is startable
                         throw new IllegalThreadStateException();
                     // 将咱们创建的worker 添加到线程池中
                     workers.add(w);
                     // 获取最新当前线程池的线程数量
                     int s = workers.size();
                     // 条件成立:说明当前线程数量是一个新高,更新lagestPoolSize线程池中的线程最大数量
                     if (s > largestPoolSize)
                         largestPoolSize = s;
                     // 表示线程已经加入到线程池中了
                     workerAdded = true;
                 }
             } finally {
                 // 释放线程池全局锁
                 mainLock.unlock();
             }
             // 条件成立:说明当前添加worker成功
             // 条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败
             if (workerAdded) {
                 // 成功后则将创建的worker启动
                 t.start();
                 // 启动标记设置为true
                 workerStarted = true;
             }
         }
     } finally {
         // 条件成立:说明添加当前线程到线程池失败或者启动线程失败,需要做清理工作
         // 1.释放令牌
         // 2.将当前worker 清理出workers集合
         if (! workerStarted)
             addWorkerFailed(w);
     }
     // 返回新创建的线程是否启动
     return workerStarted;
 }

addWorker()方法比较复杂需要慢慢消化,我们简单总结下:

1.首先addWorker会自旋进行线程池状态的校验。

2.再一层内部自旋操作,获取创建线程令牌的过程,其实就是判断线程数量是否满足要求,能否抢到锁。

3.校验通过后,创建新的工作线程,加锁,再进行状态校验,校验通过后启动工作线程。

4.校验失败,添加工作线程失败,释放令牌,将当前worker 清理出workers集合。


总结一下,本文主要讲解execute()流程,addWorker()流程,是线程池的核心内容,各位需要慢慢消化下,,后续文章会接着线程池的讲解其他内容。

如果你觉得此文对你有一丁点帮助,点个赞,关个注,不失联,还希望补充什么内容,大家在评论区留言,期待后续精彩内容。

微信搜索关注订阅号:马老司,学习更多技术点

展开阅读全文

页面更新:2024-03-23

标签:线程   流程   令牌   队列   数量   状态   条件   操作   方法   工作

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top