【原创】Java并发编程系列36 | FutureTask

2020/09/17 13:00
阅读数 36

  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为星标”!这样才不会错过每日进阶架构文章呀。

  

  2020年Java原创面试题库连载中

  (共18篇)

  【032期】JavaEE面试题(四)Spring(2)

  

  线程池源码中出现了很多Callable、Future、FutureTask等以前没介绍过的接口,尤其是线程池提交任务时总是把任务封装成FutureTask,今天就来为大家解惑:

  Runnable、Callable、Future、FutureTask

  FutureTask类结构

  FutureTask状态

  执行任务 run()方法

  获取任务返回值 get()方法

  取消任务 cancel()方法

  1. Runnable、Callable、Future、FutureTask 1.1 Runnable

  Runnable接口只有一个run方法,而run方法的返回值是void,所以线程执行完之后没有返回值。

  public interface Runnable {
public abstract void run();
}


1.2 Callable

  在很多场景下,我们通过线程来异步执行任务之后,希望获取到任务的执行结果。比如RPC框架中,需要异步获取任务返回值。这种情况下,Runnable无法获取返回值就无法满足需求了,因此Callable就出现了。

  Callable也是一个接口,也只有一个call()方法,不同的是Callable的call()方法有是有返回值的,返回值的类型是一个泛型,泛型由创建Callable对象时指定。

  public interface Callable {
V call() throws Exception;
}


1.3 Future

  要想获得Callable的返回值就需要用到Future接口。Futrue可以监视和控制Callable任务的执行情况,如对执行结果进行取消、查询是否完成、获取结果等。

  如:当一个任务通过线程池的submit()方法提交到线程池后,线程池会返回一个Future类型的对象,我们可以通过Future对象来获取任务在线程池中的状态。

  public interface Future {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}







  cancel方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
mayInterruptIfRunning参数用来表示是否需要中断线程,如果传true,表示需要中断线程,那么就会将任务的状态设置为INTERRUPTING;如果为false,那么就会将任务的状态设置为CANCELLED(关于任务的状态INTERRUPTING和CANCELLED后面会说明)

  isCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true

  isDone方法:表示任务是否已经完成,若任务完成,则返回true

  get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。

  get(long timeout, TimeUnit unit)方法:获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

  举例:Future获取Callable任务的返回值

  public class FutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newCachedThreadPool();
Future future = threadPool.submit(new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "结果";
}
});
System.out.println("Callable返回值=" + future.get());
}
}












  输出结果:

  Callable返回值=结果
1.4 FutureTask

  FutureTask是Runnable和Future的实现类,既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

  当线程池调用submit()方法来向线程池中提交任务时,无论提交的是Runnable类型的任务,还是提交的是Callable类型的任务,最终都是将任务封装成一个FutureTask对象,我们可以通过这个FutureTask对象来获取任务在线程池中的状态。

  public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
// 调用newTaskFor()将Callable任务封装成一个FutureTask
RunnableFuture ftask = newTaskFor(task);
// 执行任务
execute(ftask);
return ftask;
}






  // newTaskFor
protected RunnableFuture newTaskFor(Callable callable) {
// 直接new一个FutureTask对象
return new FutureTask(callable);
}




2. FutureTask类结构public class FutureTask implements RunnableFuture {
/** state变量用来保存任务的状态 */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;








  /** 提交的任务,Runnable类型的任务会通过Executors.callable()来转变为Callable */
private Callable callable;
/** 用来保存Callable的call()方法的返回值 */
private Object outcome;
/** 执行Callable任务的线程 **/
private volatile Thread runner;
/**
* 任务未完成时,调用get方法获取结果的线程会阻塞等待
* waiters用于保存这些线程
*/
private volatile WaitNode waiters;









  static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}





3. FutureTask状态

  FutureTask任务的状态如下:

  // 任务的初始状态,当新建一个FutureTask任务时,state值默认为NEW
private static final int NEW = 0;
// 任务处于完成中,也就是正在执行还未设置返回值
private static final int COMPLETING = 1;
// 任务正常被执行完成,并将任务的返回值赋值给outcome属性之后
private static final int NORMAL = 2;
// 任务出了异常,并将异常对象赋值给outcome属性之后
private static final int EXCEPTIONAL = 3;
// 调用cancle(false),任务被取消了
private static final int CANCELLED = 4;
// 调用cancle(true),任务中断,但是在线程中断之前
private static final int INTERRUPTING = 5;
// 调用cancle(true),任务中断,但是在线程中断之后
private static final int INTERRUPTED = 6;













  状态变化如下图:

  4. 执行任务run()

  执行future.callable.call(),执行任务;

  执行成功,设置结果outcome;

  逐个唤醒waiters中的线程去获取执行结果。

  public void run() {
/*
* 1. 不是NEW状态,不能执行
* 2. 设置runner失败,不能执行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// 真正执行任务
ran = true;// 执行成功,设置执行成功标志
} catch (Throwable ex) {
result = null;
ran = false;// 有异常,执行失败
setException(ex);// 设置异常
}
// 如果执行成功,则设置返回结果
if (ran)
set(result);
}
} finally {
runner = null;// 无论是否执行成功,把runner设置为null
int s = state;
// 处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
































  /**
* 设置执行结果
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 执行完成,设置COMPLETING状态
outcome = v;// 设置执行结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 设置完结果,设置NORMAL状态
finishCompletion();// 逐个唤醒waiters中的线程去获取执行结果
}
}









5. 获取任务返回值get()方法

  任务状态为NORMAL,直接返回执行结果;

  任务状态为COMPLETING,线程yield()让出CPU,因为COMPLETING到NORMAL只需要很短的时间,get线程让出CPU的短暂时间,任务状态就是从COMPLETING变成了NORMAL;

  任务状态为NEW,将get线程阻塞,如果设置了超时,阻塞至超时时间;如果没有设置超时,会一直阻塞直到任务完成后唤醒。

  public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果状态处于NEW或者COMPLETING状态,表示任务还没有执行完成,awaitDone()等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);// 下文详解
// 返回结果,下文详解
return report(s);
}






  /**
* 返回执行结果
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任务正常结束时,返回outcome
if (s == NORMAL)
return (V)x;
// 任务被取消了,抛出CancellationException
if (s >= CANCELLED)
throw new CancellationException();
// 这里只能第EXCEPTIONAL状态,表示在执行过程中出现了异常,抛出ExecutionException。
throw new ExecutionException((Throwable)x);
}












  /**
* 处于NEW或者COMPLETING状态时,get线程等待
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// ......
for (;;) {
// ......
// 任务处于COMPLETING中,就让当前线程先暂时放弃CPU的执行权
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// ......
// 如果设置了超时,阻塞至超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 等待一段时间
LockSupport.parkNanos(this, nanos);
}
else
// 如果没有设置超时,会一直阻塞,直到被中断或者被唤醒
LockSupport.park(this);
}
}


























6. 取消任务 cancel()

  将任务状态设置成INTERRUPTING/INTERRUPTED/CANCELLED状态就表示取消了线程,因为在这些状态下任务的run方法是不能执行的。

  public boolean cancel(boolean mayInterruptIfRunning) {
/*
* 以下情况不能取消任务:
* 1. 当前任务不是NEW状态,已经被执行了,不能取消
* 2. 当前任务还没有执行,state == NEW,但是CAS设置状态失败,不能取消
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 中断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();// 中断线程
} finally { // final state
// 中断之后,设置INTERRUPTED状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();// 唤醒waiters中的线程去获取执行结果
}
return true;
}


























  之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。

  《java面试宝典5.0》(初中级)

  《350道Java面试题:整理自100+公司》(中高级)

  《资深java面试宝典-视频版》(资深)

  《Java[BAT]面试必备》(资深)

  分别适用于初中级,中高级资深级工程师的面试复习。

  内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。

  获取方式:点“在看”,V信关注上述Java最全面试题库号并回复【面试】即可领取,更多精彩陆续奉上。

  看到这里,证明有所收获

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部