JUC之FutureTask源码深度剖析 ✨ 每日积累
API解释如下可取消的异步计算。该类提供了一个Future的基本实现,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。结果只能在计算完成后才能检索;如果计算尚未完成,则get方法将阻止。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算)。A FutureTask可用于包装Callable或Runnable对象。因为FutureTask实现Runnabl
API解释如下
可取消的异步计算。该类提供了一个
Future的基本实现 ,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。结果只能在计算完成后才能检索;如果计算尚未完成,则get方法将阻止。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 )。AFutureTask可用于包装Callable或Runnable对象。 因为FutureTask实现Runnable,一个FutureTask可以提交到一个Executor执行。 除了作为独立类之外,此类还提供了protected功能,在创建自定义任务类时可能很有用。
boolean |
cancel(boolean mayInterruptIfRunning) 尝试取消执行此任务。 |
|---|---|
protected void |
done() 此任务转换到状态 isDone (无论是正常还是通过取消)调用的受保护方法。 |
V |
get() 等待计算完成,然后检索其结果。 |
V |
get(long timeout, TimeUnit unit) 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。 |
boolean |
isCancelled() 如果此任务在正常完成之前取消,则返回 true 。 |
boolean |
isDone() 返回 true如果任务已完成。 |
void |
run() 将此未来设置为其计算结果,除非已被取消。 |
protected boolean |
runAndReset() 执行计算而不设置其结果,然后将此将来重置为初始状态,如果计算遇到异常或被取消,则不执行此操作。 |
protected void |
set(V v) 将此未来的结果设置为给定值,除非此未来已被设置或已被取消。 |
protected void |
setException(Throwable t) 导致这个未来报告一个ExecutionException与给定的可抛弃的原因,除非这个未来已经被设置或被取消。 |
FutureTask状态流转图

FetureTask的get()和cancel()的执行示意图

代码示例
import com.sun.org.apache.xpath.internal.operations.String;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask<>(() -> {
System.out.println("-----");
return 1024;
});
new Thread(futureTask).start();
Object o = futureTask.get();
}
}
(FutureTask.java)中的方法
run()
//Thread.start()最后会执行的地方
public void run() {
//非NEW状态线程或者通过CAS设置当前线程运行当前任务失败(表示被其他线程抢占了当前任务)直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//执行到这里,当前线程为NEW状态,且抢占TASK任务成功,进入任务执行的流程
try {
//自己封装的Callable或者装饰后的Runnable
Callable<V> c = callable;
//防止空指针 或者 当前线程被中断的情况(小概率)
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用我们实现的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
get()
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
/**
*线程状态几种类型
*/
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
//Callable或者Runnable使用装饰者模式伪装成Callable
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
//执行线程的引用
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
//正常执行结束,outcome保存执行的结果
//非正常outcome结束保存抛出的异常
Object x = outcome;
if (s == NORMAL)
//返回运算结果
return (V)x;
//调用cancel()被取消的状态
if (s >= CANCELLED)
throw new CancellationException();
//如果走到这一步说明实现有问题,需要check自己实现的代码。。。
throw new ExecutionException((Throwable)x);
}
public V get() throws InterruptedException, ExecutionException {
//获取当前线程状态
int s = state;
//当前线程状态为未执行、正在执行、正在完成状态时,调用get()的外部线程被阻塞(这里指的线程不是执行当前任务的线程)
if (s <= COMPLETING)
//返回当前线程状态,如果此方法中抛出异常会往上抛出
s = awaitDone(false, 0L);
return report(s);
}
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//FutureTask.get()这里的调用时不带超时属性的
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//当前线程会被封装为WaitNode对象
WaitNode q = null;
//判断当前线程的 WaitNode有没有在队列中
boolean queued = false;
//自旋操作
for (;;) {
//被中断的线程进入,被中断的线程线程调用interrupted(),返回true,之后将Thread的中断标记设置为false
if (Thread.interrupted()) {
//当前线程移出队列操作
removeWaiter(q);
throw new InterruptedException();
}
//被挂起的线程使用unpark(thread)唤醒或者新生线程走下面逻辑
//获取任务最新状态
int s = state;
if (s > COMPLETING) {
//已经创建WaitNode对象,q.thread = null;(help gc)
if (q != null)
q.thread = null;
//返回当前状态
return s;
}
//当前任务正在完成任务中,让当前线程释放1cpu,继续下一次抢占cpu
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//自旋第一步:当前线程还未创建WaitNode对象,为其创建
else if (q == null)
q = new WaitNode();
//相同线程再次自旋之后,已转变为WaitNode对象,放入队列中
else if (!queued)
//通过cas方式设置waiters指向当前线程node,设置成功返回true
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
//将当前node节点的next指向等待队列的头节点
//waiters一直指向队列的头节点
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//FutureTask.get()调用会走这里,当前get()操作的线程被park挂起,线程状态变为WAITING状态
//等待被中断或者被唤醒,继续自旋
else
LockSupport.park(this);
}
}
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
//当有很多节点时,这是很慢的,但我们不希望列表足够长,超过更高的开销方案。
private void removeWaiter(WaitNode node) {
if (node != null) {
//node.thread = null的node会出队
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// (pred)前一个结点,(q)当前节点,(s)当前节点下个节点
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
//非头节点进入
else if (pred != null) {
//【1】-》【2】-》【3】-》【4】
//例如当前节点为3,将【2】-》【4】,【3】出队
pred.next = s;
//判断前一个节点是否出队
if (pred.thread == null) // check for race
continue retry;
}
//当前节点为头节点的处理,通过cas指向头节点的下一个节点
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
//waiters一直指向队列的头节点
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
cancel()
public boolean cancel(boolean mayInterruptIfRunning) {
//state == NEW表示任务运行中或者处于线程池任务工作队列中
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
//任务运行中或者处于线程池任务工作队列中且通过CAS设置当前线程状态成功执行下边逻辑
try { // in case call to interrupt throws exception
//mayInterruptIfRunning为true执行逻辑
if (mayInterruptIfRunning) {
try {
Thread t = runner;
//当前线程很有可能为null,表明目前还没有线程获取到这个TASK任务
if (t != null)
//调用中断
t.interrupt();
} finally { // final state
//通过CAS操作将线程状态改为中断完成
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒当前所有阻塞的外部调用get()的线程
finishCompletion();
}
return true;
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
//q指向waiters节点,waiters表示当前链表头节点
for (WaitNode q; (q = waiters) != null;) {
//使用cas设置waiters为null为了防止外部线程的调用cancel()【小概率事件】
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//获取当前WaitNode节点中的thread
Thread t = q.thread;
if (t != null) {
q.thread = null; // help gc
//唤醒当前节点对应的线程
LockSupport.unpark(t);
}
//当前节点的下一个节点
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//自己可以扩展的方法
done();
callable = null; // to reduce footprint
}
小结
当外部线程调用get()之后,当前线程状态为正在执行、未执行或者正在完成的状态时,外部调用get()的线程被阻塞;之后进入一个自旋操作中,为其创建WaitNode对象,并在下一次自选成功之后通过cas操作将队列头节点设置为当前节点node对象,当前节点node对象的下一个节点指向原队列waiters(原队列的头节点),之后会被UNSAFE执行park挂起当前线程,等待被中断或者被唤醒,继续进入自旋。之后当线程被中断之后重新进入自旋操作,此时的线程节点会被执行出队操作。当被挂起的线程被唤醒之后也继续进入自旋操作中,将之前 创建的WaitNode对象中的thread属性设为null(help gc),之后返回当前线程状态。如果当先线程状态为完成中时,会在自旋中释放cpu资源,等待下一次cpu的抢占。
当外部线程调用cancel(),任务正在执行中或者任务在线程池中的工作队列中且通过cas将当前线程状态置为调用者自己传递的参数成功后,会调用线程的interrupt(),在finally 块中通过cas操作将线程状态改为已中断。之后唤醒外部调用get()被阻塞的线程。
外部线程调用Thread.start()或者线程池调度,最后执行到FutureTask.run(),只有当前线程为NEW状态并且通过cas抢占到当前TASK的运行锁之后,执行用户实现的call()
更多推荐

所有评论(0)