线程池工作原理

为什么要有线程池

在Java中使用线程池的主要原因主要归结为以下几个方面:

  1. 资源的有效使用: 创建和销毁线程是一项昂贵的操作,它涉及到与操作系统交互以获取必要的资源(例如内核线程,内存等)。使用线程池,可以在一定程度上复用这些资源,大大减少了创建和销毁线程的开销。
  2. 限制线程数量: 如果对每个并发任务都创建一个新线程,可能会导致系统中线程数量过多。每个线程都需要占用一定的内存,过多的线程可能会导致系统资源(例如内存和CPU)过度消耗,甚至可能导致系统崩溃。线程池可以限制系统中活动线程的数量,有效防止这种情况的发生。
  3. 内核线程管理: 在底层操作系统中,每个Java线程实际上都是一个内核线程或者轻量级进程。操作系统调度这些内核线程需要时间,特别是当线程数量非常多的时候。线程池通过限制活动线程的数量,可以减少操作系统的负载,提高系统的响应速度。
  4. 提高响应速度: 线程池中的线程是预先创建的,当有新的任务到来时,无需等待新线程的创建,可以直接使用线程池中的空闲线程来处理任务,提高了系统的响应速度。

线程池的创建和参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue) {
}

创建线程池需要一些参数,其中包括:

线程池的状态

在线程池的源码中,会通过一个AtomicInteger类型的变量ctl,来表示线程池的状态和当前线程池中的工作线程数量。

一个Integer占4个字节,也就是32个bit,线程池有5个状态:

  1. Running(运行状态): 可以接收新任务,也可以处理队列中的任务。
  2. Shutdown(关闭状态): 不接收新任务,但可以处理队列中的任务。
  3. Stop(停止状态): 不接收新任务,也不处理队列中的任务,同时会中断正在处理的任务。
  4. Tidying(整理状态): 所有任务都已终止,工作线程数为零,线程转向TIDYING状态并将运行terminated()钩子方法。
  5. Terminated(终止状态): terminated()方法已经完成。

用32位表示

  1. RUNNING: -1 << 29,其二进制表示是:11100000 00000000 00000000 00000000
  2. SHUTDOWN: 0,其二进制表示是:00000000 00000000 00000000 00000000
  3. STOP: 1 << 29,其二进制表示是:00100000 00000000 00000000 00000000
  4. TIDYING: 2 << 29,其二进制表示是:01000000 00000000 00000000 00000000
  5. TERMINATED: 3 << 29,其二进制表示是:01100000 00000000 00000000 00000000

线程池的工作线程数量被存储在整数状态变量的低29位上。这是因为,整数状态的高3位被用来表示线程池的状态(RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED),所以剩下的29位用来表示工作线程的数量。

获取线程池的数量

private static int workerCountOf(int c)  { return c & CAPACITY; }

其中CAPACITY=(1 << 29) - 1;,CAPACITY用32位表示:

00011111 11111111 11111111 11111111

如果线程池的数量是5,并且是RUNNING状态,它的32位表示:

11100000 00000000 00000000 00000101

&的结果为101。这也就是为撒线程池的数量不能超过CAPACITY,因为前三位保存了状态。

线程池为什么一定得是阻塞队列、线程是怎么保活的?

线程池使用阻塞队列(BlockingQueue)是因为它需要解决生产者-消费者问题。在线程池的场景中,"生产者"是提交任务的代码,而"消费者"是执行任务的线程。阻塞队列为生产者和消费者之间提供了一个平衡,以防止它们之间的速度差异导致的问题。

线程池中的线程在运行过程中,执行完创建线程时绑定的第一个任务后,就会不断的从队列中获取任务并执行,那么如果队列中没有任务了,线程为了不自然消亡,就会阻塞在获取队列任务时,等着队列中有任务过来就会拿到任务从而去执行任务。

关键代码:

Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

Tomcat是自定义线程池的入队逻辑?

  1. 如果当前运行的线程数量小于核心线程数(corePoolSize),线程池会创建一个新的线程来执行这个任务,即使有空闲线程也是这样。
  2. 如果当前运行的线程数量达到或者超过核心线程数,但是小于最大线程数(maximumPoolSize),线程池会尝试将任务加入队列。但如果此时正在处理的任务数小于当前的线程数,这个入队操作会失败(offer方法返回false)。这时,线程池会选择创建新的线程来执行任务,而不是使用空闲的线程。
  3. 如果当前运行的线程数量达到最大线程数,线程池会尝试将任务加入队列。此时不论offer方法是否成功,线程池都不会创建新的线程。如果offer方法失败,表示队列已满,那么线程池会拒绝这个任务。

这种策略可以更快地处理任务,因为它避免了任务在队列中等待的时间。

然而这种策略可能会导致过多的线程被创建,尤其是在处理大量短期任务的时候。过多的线程可能会导致系统的上下文切换开销增加,从而降低整体的性能。因此,这种策略并不适用于所有场景,需要根据具体的应用需求来选择合适的线程池策略。

线程池的执行流程

public void execute(Runnable command) {
    //线程的状态和数量,都保存在ctl
    int c = ctl.get();
    //工作线程数小于corePoolSize,则添加工作线程,并把command作为该线程要执行的任务
    if (workerCountOf(c) < corePoolSize) {
    //添加线程
    //true表示添加的是核心线程,内部会去判断是否超过corePoolSize,如果超过了则会添加失败。
        if (addWorker(command, true))
            return;
    //有可能其他线程也在添加,导致超过了corePoolSize
        c = ctl.get();
    }
    //判断是否是running状态,如果是添加到阻塞队列中。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
     //有可能在入队的时候线程池状态发生了改变,就把刚才的线程移除。
        if (! isRunning(recheck) && remove(command))
     //并执行拒绝策略
            reject(command);
      //怕刚才加入队列的线程,没有工作线程去执行,如果没有则添加一个非核心线程
      //目的就是为了从队列中获取任务并去执行
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果线程池状态不是RUNNING,或者线程池状态是RUNNING但是队列满了,则去添加一个非核心工作线程
    else if (!addWorker(command, false))
        reject(command);
}



executor

executor总结:

1、判断当前线程数是否小于核心线程数,如果小于则添加核心线程。

2、如果大于核心线程数,并且是running,则添加到队列中。

3、如果队列满了或者不是running,状态则添加非核心线程,如果添加非核心线程失败则执行拒绝策略。(其实如果不是running在添加非核心线程的时候也会返回false,走拒绝策略)

addWorker方法

addWorker方法是是用来添加线程的,core参数表示添加的是核心线程还是非核心线程。

如果core=true:需要判断目前的工作线程数是否超过了corePoolSize。

如果core=false,那就要判断工作线程数是否超过最大线程数。

流程:

首先判断是否是shutdown(除了runnable状态,它不会再去新建线程),如果是shutdown,并且穿的参数runnable是null,并且队列中的任务不是空,那么可以继续往下走(新增一个线程加快处理队列中的任务)。

if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
 return false;

1、自旋->判断core,然后cas增加线程数量。

2、自旋成功,通过线程工厂新建一个线程,并将线程添加到workers中,线程执行start,执行runnable。

addWorker

getTask获取任务

常见的面试题,线程池中的线程是如何保活的,就是这个方法。

allowCoreThreadTimeOut:默认为false,表示是否回收核心线程。

for (;;) {
 //线程数量
    int wc = workerCountOf(c);
    

    // 是否需要要等待多少秒后把线程销毁
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    //因为这里是自旋,所以如果任务队列为空返回null,并且工作线程减1,返回null
    if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
 }

    try {
        Runnable r = timed ?
            //如果队列为空等待多少秒
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
         //直接阻塞获取
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}

总结:

1、如果线程数大于核心线程数,任务队列不会空,poll(time)

2、如果线程数大于核心线程数,任务队列为空,线程数减1,并返回null。

3、如果线程数小于核心线程数,则一直阻塞等待获取任务,take()。

这里也有个参数allowCoreThreadTimeOut,默认为false。如果为true,那么即使线程数小于核心线程数,如果队列为空也会线程数减1,并返回null。

runWorker方法

在addWorker时候会通过工厂方法进行创建线程,把线程和runnable封装到Worker中。

Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

worker实现了runnable接口重写了run方法,当我们执行start时候其实执行的是runWorker。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //getTask是从任务队列中获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //当我们调用stop方法的时候实际上线程会执行interrupt来停止线程
                //如果上一个任务执行了标记,那么现在需要清楚标记。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try 
                    Throwable thrown = null;
                    try {
                        task.run();
                    }  catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
      //工作线程数减一并且销毁。
            processWorkerExit(w, completedAbruptly);
        }
    }
展开阅读全文

页面更新:2024-03-23

标签:线程   队列   内核   数量   操作系统   状态   核心   参数   方法   系统

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top