CompletableFuture 的同步与异步

原创
2018/11/04 01:40
阅读数 3.1K

CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台。

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要获取线程的计算结果。

CompletableFuture 简单的异步运算场景

CompletableFuture 提供了如下的异步方法,

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

supplyAsync 返回带有任务结果的CompletableFuture,而runAsync返回CompletableFuture<Void>。

Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池。

注意:ForkJoinPool.commonPool() 是 Daemon Thread(守护线程) 

只要当前JVM实例中尚存在任何一个非守护线程(用户线程)没有结束,守护线程就全部工作;

只有当用户线程结束时,JVM推出,守护线程随着JVM一同结束工作。

@Test
public void test() throws InterruptedException {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
        System.out.println("runAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    System.out.println("done=" + cf.isDone());
    TimeUnit.SECONDS.sleep(4);
    System.out.println("done=" + cf.isDone());
}

输出,

done=false
runAsync=ForkJoinPool.commonPool-worker-1|true
done=true

在这段代码中,runAsync 是异步执行的 ,通过 Thread.currentThread().isDaemon() 打印的结果就可以知道是Daemon线程异步执行的。

 

CompletableFuture 同步执行示例

CompletableFuture中不带Async的同步方法如下,

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public CompletableFuture<Void> thenRun(Runnable action);

这些方法都是同步执行的

@Test
public void testSync11() {
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        randomSleep();
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 如果成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,然后返回结果
    System.out.println(cf.join());
}

输出如下,

thenApply=main|false
MESSAGE
MESSAGE

首先通过 completedFuture 方法获取一个结果已经完成的Future,然后执行同步方法thenApply,由main线程执行,会阻塞当前的main线程 ,最后getNow方法获取到结果。

 

CompletableFuture 异步执行示例

CompletableFuture中异步执行的方法都是带Async 结尾的,可以制定执行异步任务的线程池,也可以不指定,如果不指定,默认使用ForkJoinPool.commonPool() 线程池。

public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

以下使用的两个方法都是异步执行任务的方法

@Test
public void testAsync1() {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        randomSleep();
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    }).thenApplyAsync(s -> {
        randomSleep();
        System.out.println("thenApplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 如果成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,然后返回结果
    System.out.println(cf.join());
}

输出如下,

null
supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApplyAsync=ForkJoinPool.commonPool-worker-1|true
MESSAGE

当执行 cf.gotNow 方法的时候,异步任务还没有执行完成,所以返回 null 。执行 cf.join 方法,阻塞一直等到异步任务结果返回。

 

thenApply 是由哪个线程执行的

thenApply 不带async结尾,是一个同步方法,但可能还是由执行任务的线程池来执行,或者是当前main线程来执行。

@Test
public void testAsync125() {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        //没有sleep
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    }).thenApply(s -> {
        // thenApplyAsync=main|false 使用调用者线程来进行处理
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 如果成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,然后返回结果
    System.out.println(cf.join());
}

@Test
public void testAsync126() throws InterruptedException {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        randomSleep();
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    });

    TimeUnit.SECONDS.sleep(2);

    // 使用调用者线程 当前线程main 来进行处理thenApply 转换操作
    cf = cf.thenApply(s -> {
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 如果成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,然后返回结果
    System.out.println(cf.join());
}

@Test
public void testAsync124() {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        randomSleep();
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    }).thenApply(s -> {
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 如果成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,然后返回结果
    System.out.println(cf.join());
}

输出如下,

supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApplyAsync=main|false
MESSAGE
MESSAGE

//////
supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApply=main|false
MESSAGE
MESSAGE

//////
null
supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApply=ForkJoinPool.commonPool-worker-1|true
MESSAGE

在testAsync125方法中,thenApply 回调方法是由当前main线程执行的;

在testAsync126方法中,thenApply 回调方法是由当前main线程执行的;

在testAsync124方法中,thenApply 方法是由执行任务的线程池的线程来执行的,thenApply 虽然是一个同步方法,但其调用是通过 ForkJoinPool.commonPool 线程池异步执行的。

所以要注意的是 如果在thenApply 方法中执行比较耗时的操作,会阻塞调用者线程或者主线程。

 

CompletableFuture allOf 方法同步执行效果

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture<String> future1
    = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello";
});

CompletableFuture<String> future2
    = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Beautiful";
});

CompletableFuture<String> future3
    = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(4);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "World";
});

System.out.println("f1=" + future1.isDone());
System.out.println("f2=" + future2.isDone());
System.out.println("f3=" + future3.isDone());
CompletableFuture<Void> combinedFuture
    = CompletableFuture.allOf(future1, future2, future3);

System.out.println("========");
System.out.println("f1=" + future1.isDone());
System.out.println("f2=" + future2.isDone());
System.out.println("f3=" + future3.isDone());

// 等待所有的future 执行完成
combinedFuture.join();

System.out.println("========");
System.out.println("f1=" + future1.isDone());
System.out.println("f2=" + future2.isDone());
System.out.println("f3=" + future3.isDone());
f1=false
f2=false
f3=false
========
f1=false
f2=false
f3=false
========
f1=true
f2=true
f3=true

通过 combinedFuture.join()  方法等待所有的异步任务执行完成。当其所有的CompletableFuture均完成结果时,combinedFuture就会处于完成状态

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3)
    .map(CompletableFuture::join)
    .collect(Collectors.joining(" "));

System.out.println(combined);

更简化后完整连贯的代码,

@Test
public void testAllOf2() {
    CompletableFuture<String> future1
        = CompletableFuture.supplyAsync(() -> "Hello");

    CompletableFuture<String> future2
        = CompletableFuture.supplyAsync(() -> "Beautiful");

    CompletableFuture<String> future3
        = CompletableFuture.supplyAsync(() -> "World");

    CompletableFuture.allOf(future1, future2, future3)
        .thenApply((v) -> Stream.of(future1, future2, future3)
            .map(CompletableFuture::join)
            .collect(Collectors.joining(" ")))
        .thenAccept(System.out::println);

}

========END========

展开阅读全文
打赏
0
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
0
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部