在Java中使用线程池的主要原因主要归结为以下几个方面:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
}
创建线程池需要一些参数,其中包括:
在线程池的源码中,会通过一个AtomicInteger类型的变量ctl,来表示线程池的状态和当前线程池中的工作线程数量。
一个Integer占4个字节,也就是32个bit,线程池有5个状态:
线程池的工作线程数量被存储在整数状态变量的低29位上。这是因为,整数状态的高3位被用来表示线程池的状态(RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED),所以剩下的29位用来表示工作线程的数量。
private static int workerCountOf(int c) { return c & CAPACITY; }
其中CAPACITY=(1 << 29) - 1;,CAPACITY用32位表示:
00011111 11111111 11111111 11111111
如果线程池的数量是5,并且是RUNNING状态,它的32位表示:
11100000 00000000 00000000 00000101
&的结果为101。这也就是为撒线程池的数量不能超过CAPACITY,因为前三位保存了状态。
线程池使用阻塞队列(BlockingQueue)是因为它需要解决生产者-消费者问题。在线程池的场景中,"生产者"是提交任务的代码,而"消费者"是执行任务的线程。阻塞队列为生产者和消费者之间提供了一个平衡,以防止它们之间的速度差异导致的问题。
线程池中的线程在运行过程中,执行完创建线程时绑定的第一个任务后,就会不断的从队列中获取任务并执行,那么如果队列中没有任务了,线程为了不自然消亡,就会阻塞在获取队列任务时,等着队列中有任务过来就会拿到任务从而去执行任务。
关键代码:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
这种策略可以更快地处理任务,因为它避免了任务在队列中等待的时间。
然而这种策略可能会导致过多的线程被创建,尤其是在处理大量短期任务的时候。过多的线程可能会导致系统的上下文切换开销增加,从而降低整体的性能。因此,这种策略并不适用于所有场景,需要根据具体的应用需求来选择合适的线程池策略。
public void execute(Runnable command) {
//线程的状态和数量,都保存在ctl
int c = ctl.get();
//工作线程数小于corePoolSize,则添加工作线程,并把command作为该线程要执行的任务
if (workerCountOf(c) < corePoolSize) {
//添加线程
//true表示添加的是核心线程,内部会去判断是否超过corePoolSize,如果超过了则会添加失败。
if (addWorker(command, true))
return;
//有可能其他线程也在添加,导致超过了corePoolSize
c = ctl.get();
}
//判断是否是running状态,如果是添加到阻塞队列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//有可能在入队的时候线程池状态发生了改变,就把刚才的线程移除。
if (! isRunning(recheck) && remove(command))
//并执行拒绝策略
reject(command);
//怕刚才加入队列的线程,没有工作线程去执行,如果没有则添加一个非核心线程
//目的就是为了从队列中获取任务并去执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果线程池状态不是RUNNING,或者线程池状态是RUNNING但是队列满了,则去添加一个非核心工作线程
else if (!addWorker(command, false))
reject(command);
}
executor
executor总结:
1、判断当前线程数是否小于核心线程数,如果小于则添加核心线程。
2、如果大于核心线程数,并且是running,则添加到队列中。
3、如果队列满了或者不是running,状态则添加非核心线程,如果添加非核心线程失败则执行拒绝策略。(其实如果不是running在添加非核心线程的时候也会返回false,走拒绝策略)
addWorker方法是是用来添加线程的,core参数表示添加的是核心线程还是非核心线程。
如果core=true:需要判断目前的工作线程数是否超过了corePoolSize。
如果core=false,那就要判断工作线程数是否超过最大线程数。
流程:
首先判断是否是shutdown(除了runnable状态,它不会再去新建线程),如果是shutdown,并且穿的参数runnable是null,并且队列中的任务不是空,那么可以继续往下走(新增一个线程加快处理队列中的任务)。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
1、自旋->判断core,然后cas增加线程数量。
2、自旋成功,通过线程工厂新建一个线程,并将线程添加到workers中,线程执行start,执行runnable。
addWorker
常见的面试题,线程池中的线程是如何保活的,就是这个方法。
allowCoreThreadTimeOut:默认为false,表示是否回收核心线程。
for (;;) {
//线程数量
int wc = workerCountOf(c);
// 是否需要要等待多少秒后把线程销毁
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//因为这里是自旋,所以如果任务队列为空返回null,并且工作线程减1,返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//如果队列为空等待多少秒
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//直接阻塞获取
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
总结:
1、如果线程数大于核心线程数,任务队列不会空,poll(time)
2、如果线程数大于核心线程数,任务队列为空,线程数减1,并返回null。
3、如果线程数小于核心线程数,则一直阻塞等待获取任务,take()。
这里也有个参数allowCoreThreadTimeOut,默认为false。如果为true,那么即使线程数小于核心线程数,如果队列为空也会线程数减1,并返回null。
在addWorker时候会通过工厂方法进行创建线程,把线程和runnable封装到Worker中。
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
worker实现了runnable接口重写了run方法,当我们执行start时候其实执行的是runWorker。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask是从任务队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
//当我们调用stop方法的时候实际上线程会执行interrupt来停止线程
//如果上一个任务执行了标记,那么现在需要清楚标记。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
Throwable thrown = null;
try {
task.run();
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//工作线程数减一并且销毁。
processWorkerExit(w, completedAbruptly);
}
}
页面更新:2024-03-23
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号