基本上工作中和Future接口 打交道比较多,比如线程池ThreadPoolExecutor#sumbit方法,返回值就是一个Future(实际上基本上就是一个FutureTask)。ThreadPoolExecutor#sumbit需要传入一个Callable,我们作为调用方法,在其中的call方法编写业务逻辑,然后ThreadPoolExecutor会将其包装为一个FutureTask 提交到线程池中(异步),并立马返回FutureTask,从而让调用方可以取消任务,查看任务是否运行结束,获取异步任务结果 FutureTask就如同一个纽带,连接了任务 和 任务的结果
FutureTask源码注释中写到:
FutureTask是可取消的异步任务。此类提供Future的基本实现,包括启动,取消、查询以查看任务是否完成以及获取任务结果的方法。只有在任务完成后才能检索结果,如果尚未完成,get方法将阻塞。任务完成后,无法重新启动或取消任务(除非使用runAndReset调用任务)。
FutureTask可用于包装Callable或Runnable对象。因为FutureTask实现了Runnable,所以FutureTask可以提交给Executor执行。
Future 表示异步任务的运行结果,它定义了取消、查询以查看任务是否完成以及获取任务结果的方法。只有在任务完成后才能检索结果,如果尚未完成,get方法将阻塞。任务完成后,无法重新启动或取消任务(除非使用runAndReset调用任务)
方法 | 解释 |
boolean cancel(boolean mayInterruptIfRunning) | 尝试取消此任务的执行。如果任务已完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。如果成功且在调用取消时此任务尚未启动,则任务不会运行。如果任务已经开始,则 mayInterruptIfRunning 参数确定是否应该中断执行该任务的线程以尝试停止该任务。此方法返回后,对 isDone 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled 的后续调用将始终返回 true。 |
boolean isCancelled() | 如果此任务在正常完成之前被取消,则返回 true |
boolean isDone() | 如果此任务完成,则返回 true。完成可能是由于正常终止、异常或取消——在这些情况下,此方法都将返回 true。 |
V get() throws InterruptedException, ExecutionException | 如果任务没有执行完,那么一直阻塞直到任务完成,直到任务运行成功,或者运行失败,或者运行任务线程被中断 |
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException | get()的超时等待版本,指定当前线程等待时间,如果超时任然没有执行结束,那么抛出TimeoutException |
"可以运行的未来",即表示一个可以运行的任务(是一个Runnable),也表示异步任务的结果(是一个Future)
小小FutureTask牛在哪儿昵,为什么是doug lea写的,我不行?
为多个线程对FutureTask运行状态的可见性,FutureTask具备属性private volatile int state,使用volatile修饰,可取以下值(Future定义了对应的常量):
常量 | 解释 |
NEW | 任务处于新建状态 |
COMPLETING | 任务正在完成,意味着异步计算结束,但是结果没有写回到 outcome属性上 |
NORMAL | 任务正常结束,说明程序员定义的业务逻辑,没有抛出异常 |
EXCEPTIONAL | 任务运行失败,说明程序员定义的业务逻辑抛出异常 |
CANCELLED | 任务被取消,说明调用方法调用了cancel方法,在任务运行之前取消了任务 |
INTERRUPTING | 任务正在被中断,是一个瞬态,使用cancel(true),调用方正在中断运行任务的线程 |
INTERRUPTED | 任务已经被中断 |
FutureTask使用outcome属性记录任务运行结果,或者运行失败的抛出的异常,在我们定义的业务逻辑(`ThreadPoolExecutor#sumbit传入的Callable)成功运行结束,到运行的结果写回到outcome,并不是一个瞬间动作,在运行结果赋值到outcome的时候,会先将FutureTask状态修改为COMPLETING
这样做的目的是,任务处于COMPLETING,这是一个线程调用get获取任务的时候,不会让调用线程阻塞而是让这个线程yield放弃cpu稍等片刻,任务结果马上写回了。
考虑一个情况,线程A正在执行任务,线程BCD都调用了get,线程E调用了FutureTask#cancel(true),这时候需要中断线程A,并且我们在这个FutureTask的业务逻辑中定义了,如果被中断,那么任务结束,抛出异常等逻辑
这时候FutureTask#run会抛出一个异常(什么异常取决于FutureTask的业务逻辑抛出什么),那么线程BCD调用get造成的阻塞如何处理,需要去唤醒BCD,在修改状态到中断其实是存在短暂的时间的,在这短暂的时间内线程A会yield,放弃CPU,等待线程E中断方法调用结束,然后线程E会唤醒BCD。
任务被包装为Callable对象,任务结果使用outcome属性记录,运行任务的线程使用runner属性记录。
需要注意的是这里的outcome,没有使用volatile,后面的注释写到non-volatile, protected by state reads/writes(不使用volatile修饰,可见性由state属性的写入和读取保证),为什么不需要volatile关键字修饰,我们看源码的时候解释
我们上面说到,调用get方法的时候,如果任务没有结束,那么调用线程,将阻塞到任务结束。这意味着任务结束的时候,调用线程将被唤醒,那么哪些线程需要唤醒?FutureTask使用WaitNode类型的属性记录
通过WaitNode属性记录调用线程,并且使用next属性串联起其他等待的线程
看完这些属性,我们来拜读doug lea大师的代码
可以看到如果传入Runnable,那么将被使用RunnableAdapter包装成Callable,典型的适配器模式,通过RunnableAdapter适配Runnable为Callable
需要注意callable没有使用volatile修饰,doug lea 不担心重排序的问题么
如果执行FutureTask的构造方法的时候,发生重排序,this.callable的赋值重排序到外部获取到构造方法生成的FutureTask的后面,并且立马有另外一个线程调用了FutureTask的任务执行方法,这时候this.callable还来不及赋值,调用执行方法抛出空指针异常。那么为什么不用volatile修饰callable还能保证其可见性昵,能让源码写上// ensure visibility of callable这行注释昵?
在《JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》的学习笔记中,我们说过volatile变量写具备如下内存屏障
这里的store store屏障防止了this.callable的赋值重排序到this.state = NEW之后,且后续的store屏障会保证当前线程(构造FutureTask的线程)工作内存会立马写回到主内存,并让其他线程关于此FutureTask的缓存无效,从而保证了callable的线程可见性。
我们从run方法看起,run方法中的许多逻辑,牵扯到其他的方法,可能需要总览全局才能彻底理解
public void run() {
//如果不是初始,说明有其他线程启动了,或者说其他线程取消了任务 那么不需要运行
//如果是new 但是cas runner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;
//再次校验下是否为初始状态 如果不是 说明在当前线从第一个if到此存在其他线程取消任务
//任务启动之前可以取消任务的运行
if (c != null && state == NEW) {
V result;
//记录当前任务是否成功执行,如果Callable代码写错了,
//或者说Callable响应中断,执行的途中被中断那么为false
boolean ran;
try {
//业务逻辑执行
result = c.call();
//成功执行
ran = true;
} catch (Throwable ex) {
//这里可能是Callable本身代码逻辑错误异常 也可能是响应中断抛出异常
result = null;
ran = false;
//记录异常
setException(ex);
}
if (ran)
//设置任务正常执行结果
set(result);
}
} finally {
runner = null;
int s = state;
//处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
这两个方法都差不多,都是上来一个CAS将state从new转变为COMPLETING,然后用outcome记录异常或者记录成功返回值,然后使用UNSAFE.putOrderedInt改变state,如果是出现异常,那么设置状态为EXCEPTIONAL,如果正常结束设置为NORMAL。
private void finishCompletion() {
// 等待的节点
for (WaitNode q; (q = waiters) != null;) {
//将当前futureTask的waiters属性cas为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//唤醒所有get方法阻塞的线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
//直到一个为null的节点,意味着遍历结束
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
//结束
break;
}
}
//钩子方法 留给我们自己扩展
done();
//将任务置为null
callable = null; // to reduce footprint
}
这里拿到waiters然后进行自旋遍历所有等待的节点线程,然后唤醒它们,有意思的点在UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)为何这里要使用CAS更新waiters为null昵?
因为这里存在线程A执行完FutureTask调用finishCompletion的同时线程B调用get进行等待,调用get方法进行排队(排队时也是CAS设置自己为waiters)这两个CAS必定有一个成功,有一个失败
这一点我们在get方法的源码分析的时候,会深有体会
其次在取消任务的时候也会调用finishCompletion唤醒等待的线程,所有finishCompletion的调用存在线程安全问题,需要使用cas保证线程安全
在run方法的finally块中存在
//运行完设置runner为空
runner = null;
//重新获取状态
int s = state;
//如果是INTERRUPTING 或者INTERRUPTED
if (s >= INTERRUPTING)
//handlePossibleCancellationInterrupt
handlePossibleCancellationInterrupt(s);private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
//如果是打断中 那么等待直到结束打断
while (state == INTERRUPTING)
Thread.yield();
cancel方法可以选择传入true表示,如果任务还在运行那么调用运行任务线程的interrupt方法进行中断,如果是调用cancel的线程还没有完成中断那么当前运行的线程会让步,为什么这么做,我们上面说到过,A线程运行任务,B线程cancel任务,B中断线程A其实是需要时间的,B会先修改任务状态为INTERRUPTING,然后中断线程A,然后修改状态为INTERRUPTED并唤醒等待的线程,从INTERRUPTING - > INTERRUPTED 这段时间,线程A只需要让出cpu等待即可
public V get() throws InterruptedException, ExecutionException {
int s = state;
//任务为NEW 和 COMPLETING 那么调用那么会调用awaitDone
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//此方法如果发现FuturetTask调用异常那么抛出异常
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
//如果状态小于等于COMPLETING ( NEW 和 COMPLETING) 那么会调用awaitDone
//如果awaitDone结束的时候返回的状态还是 NEW or COMPLETING 抛出超时异常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
//此方法如果发现FuturetTask调用异常那么抛出异常
return report(s);
}
二者最终都调用了awaitDone(是否超时等待,等待时长)
这意味着 状态为new,任务都运行完业务逻辑,或者状态为COMPLETING,业务逻辑运行完了但是outcome正在赋值为执行结果,对outcome赋值后,会修改状态为NORMAL(任务正常完成),或者EXCEPTIONAL(任务执行抛出异常),所有get方法只有状态小于等于COMPLETING,才会调用awaitDone挂起线程进行等待
调用此方法的前提是,任务逻辑没有执行完,或者逻辑执行完但是结果还没有赋值给outcome,那么这两种情况doug lea如何处理
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//等待结束时间,如果非超时等待,那么为0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//当前线程如果进入等待任务完成队列,此变量记录等待节点
WaitNode q = null;
//是否入队(等待任务完成队列)Waiters 使用next组成的队列
boolean queued = false;
for (;;) {
//如果等待的过程中被中断,
//那么把自己从等待waiters中删除
//并且抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//读取state,volatile保证可见性
int s = state;
//如果当前大于COMPLETING 说明任务执行完成 outcome已经赋值了,
//或者取消了,或者由于取消而被中断 直接返回当前状态,不需要再等了
if (s > COMPLETING) {
//节点线程置为null,后续执行任务线程唤醒等待线程的时候不会唤醒到此线程
if (q != null)
q.thread = null;
return s;
}
//如果任务正在完成,进行线程让步
//后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,
//也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE.putOrderedInt存在线程可见性问题)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//当前线程的节点
else if (q == null)
q = new WaitNode();
//如果没有入队(等待任务完成的队列)那么入队(等待任务完成的队列)
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果是超时等待
else if (timed) {
nanos = deadline - System.nanoTime();
//等待超时 把自己从等待任务完成队列中移除
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//等待指定时间
LockSupport.parkNanos(this, nanos);
}
else
//无限期等待
LockSupport.park(this);
}
}
此方法分支很多,我们慢慢看
private V report(int s) throws ExecutionException {
Object x = outcome;
//如果任务正常结束
if (s == NORMAL)
//强转
return (V)x;
//如果任务取消了 或者由于取消被中断了,抛出取消异常
if (s >= CANCELLED)
throw new CancellationException();
//反之抛出ExecutionException 包装 原始的异常
throw new ExecutionException((Throwable)x);
}
只有任务正常执行的时候,才会返回结果,如果被取消那么抛出取消异常。
取消有一个比较有趣的点,如果取消在任务开始之前,那么说明取消成功,后续任务完成调用set或者setException应该是什么都不做。如果取消在任务执行之后,那么取消的这个动作应该失败,下面我们看下doug lea如果处理这个细节。
//mayInterruptIfRunning 表示需要中断任务执行线程
public boolean cancel(boolean mayInterruptIfRunning) {
//任务不是初始,或者CAS修改状态从new 到INTERRUPTING 或者CANCELLED 失败
//直接返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
//如果需要中断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
//执行中断
if (t != null)
t.interrupt();
} finally { // final state
//修改状态为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒所有等待任务执行的线程
finishCompletion();
}
return true;
}
这里为INTERRUPTING ,可能是取消任务的线程还没来得及执行UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED),也可能是可见性导致运行任务的线程没有读取到最新的state,handlePossibleCancellationInterrupt会让运行任务的线程等待
页面更新:2024-05-14
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号