面试被问到线程池是怎样实现线程复用的,如何回答?

如果你只知道线程池的几个参数,面试被问到再深一点的内容就回答不上来了,那么本系列文章带你熟悉线程池源码,更深入的了解线程池。

ThreadPoolExecutor将分几篇文章做解析,本文是第二篇,第一篇请见Java线程池只知道几个参数?来看看源码你就清楚了(一),关注一波,不错过后续内容,下面是核心知识点。本篇文章讲解Worker类、runWorker()、getTask()

一、Worker类详解

1.Worker类图

Worker是ThreadPoolExecutor的内部类,根据类图可以看出Worker继承了
AbstractQueuedSynchronizer类,实现了Runnable接口,也就是说,他本身就相当于一个同步队列,结合他的成员变量 thread 和 firstTask,可以知道他实际上就是我们线程池中所说的“线程”。除了父类 AQS 本身提供的独占锁以外,Worker 还提供了一些检查任务线程运行状态以及中断线程相关的方法。

此外,线程池中还有一个工作队列 workers,用于保存当前全部的 Worker:

private final HashSet workers = new HashSet();

2.Worker类源码

本质上,Worker类既是一个同步组件,也是一个执行任务的线程。所以如果有人问你线程池的线程是怎么包装的,至少我们现在知道有个Worker来包装而不是简单的Thread。下面分析下Worker源码:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // 工作线程
        final Thread thread;
    
        // 初始任务,只在worker第一次执行任务的时候执行,之后都是从workQueue中获取任务执行
        Runnable firstTask;
    
        // 线程执行过的任务数
        volatile long completedTasks;

       //Worker类的构造方法,初始化任务并调用线程工厂创建执行任务的线程
        Worker(Runnable firstTask) {
            setState(-1);// 调用runWorker()前禁止中断
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

       //重写Runnable接口的run()方法
        public void run() {
          //调用ThreadPoolExecutor类的runWorker(Worker)方法
            runWorker(this);
        }
    	... ...
    }

在Worker类的构造方法中,首先将同步状态state设置为-1,为了防止runWorker方法运行之前被中断。这是因为如果其他线程调用线程池的shutdownNow()方法时,如果Worker类中的state状态的值大于0,则会中断线程,如果state状态的值为-1,则不会中断线程。

Worker实现了Runable接口,在调用start()方法后,实际执行的是run方法,里面又调用了 ThreadPoolExecutor的runWorker()方法。下面我们来看看这个runWorker方法,知道线程池的线程是如何运行的。

二、runWorker()方法分析

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 新创建Worker时默认state为-1,AQS的unlock方法会将其改为0,此后允许使用interruptIfStarted()方法进行中断
    
    // 完成任务以后是否需要移除当前Worker,即当前任务是否意外退出
    boolean completedAbruptly = true;
    
    try {
        // 循环获取任务,后面解析getTask方法
        while (task != null || (task = getTask()) != null) {
            // 加锁,防止 shundown 时中断正在运行的任务
            w.lock();
            // 如果线程池状态为 STOP 或更后面的状态,中断线程任务
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子方法,默认空实现,可以自己重写
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 任务执行完毕,完成数量+1
                w.completedTasks++;
                w.unlock();
            }
        }
        
        completedAbruptly = false;
    } finally {
        // 根据completedAbruptly决定是否要移除意外退出的Worker,并补充新的Worker
        // 也就是说,如果上述过程顺利完成,工作线程没有挂掉,就不删除,下次继续用,
       // 否则就在workers中remove掉,然后再调用addWorker()方法添加。
      //另外如果获取不到任务,getTask返回空,也会清楚该worker,起到回收非核心线程的目的
        processWorkerExit(w, completedAbruptly);
    }
}

概括一下runWorker方法的核心逻辑:

循环调用getTask方法,获取要执行的线程,加锁然后执行任务,如果执行完任务流程,并且没有发生异常导致 Worker 挂掉,就直接复用 Worker(在获取任务的方法 getTask()中循环等待任务),如果执行完任务流程后发现发生异常导致 Worker 挂掉,就从工作队列中移除当前 Worker,并且补充一个新的;

当然getTask方法就是从队列中获取的,来看下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果线程池关闭了,且队列里的任务都完成了,或者线程池进入了比 STOP更大的状态
       //,就不表示没有新任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 获取当前工作线程数
        int wc = workerCountOf(c);

        // 时候设置了核心线程超时(默认false)或当前线程数大于核心线程数,及存在非核心线程,
       // 即判断当前当前是否需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

       //如果线程池中的线程数量大于corePoolSize
				//获取大于corePoolSize或者是否正在等待执行任务并且轮询超时
				//并且当前线程池中的线程数量大于1或者任务队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //成功减少线程池中的工作线程数量
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 阻塞获取任务
            Runnable r = timed ?
                // 阻塞 keepaliveTime 以获取任务,如果在 keepaliveTime 时间内没有获取到任务,则返回 null.
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果获取不到任务,说明非核心线程超时了,下一轮判断确认是否退出循环。
           //退出循环后,runWorker方法执行完毕,会被processWorkerExit回收
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

概括一下getTask方法:

判断线程池是否关闭,工作队列是否为空,如果是说明没任务了,直接返回null,否则接着往下判断,判断当前是否存在非核心线程,如果是说明需要进行超时处理,获取任务,如果不需要超时处理,则直接从任务队列获取任务,否则根据 keepaliveTime 阻塞一段时间后获取任务,如果获取不到,说明非核心线程超时,返回 null 交给 runWorker()中的processWorkerExit()方法去删除。

换句话说,runWorker()方法一旦执行完毕(非核心线程),必然会删除当前的 Worker,而通过 getTask()拿任务的 Worker(核心线程),在线程池正常运行的状态下,核心线程只会一直在 for 循环中等待直到拿到任务,而非核心线程超时以后拿不到任务就会返回一个 null,然后回到 runWorker()中走完processWorkerExit()方法被删除。

三、Worker类总结

看了runWorker和getTask方法后,不知道你对这个Worker有没有进一步理解,那么这个Worker充当了什么作用呢,总结一下:

1.封装线程,将线程进行包装,与线程池的状态做关联。

2.通过worker达到线程复用的目的,while循环从队列中获取任务,达到核心线程一直运行,非核心线程运行有过期时间的目的,并且worker数量具有自我恢复能力(其实是重新创建worker),保证线程数量。

这篇就到这,后续文章会继续分析线程池的其他内容。

如果你觉得此文对你有一丁点帮助,点个赞,关个注,不失联,还希望补充什么内容,大家在评论区留言,期待后续精彩内容。

微信搜索关注订阅号:马老司,学习更多技术点

展开阅读全文

更新时间:2024-08-22

标签:线程   目的   队列   源码   数量   核心   状态   方法   内容   工作

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top