文档章节

CompeletableFuture的使用

52iSilence7
 52iSilence7
发布于 2017/08/15 14:22
字数 1681
阅读 29
收藏 0

CompeletableFuture的使用

例子

我们就使用Java8 in action里面的商店的例子来说明。 我们写了一个应用,这个应用需要通过互联网接口从其他的服务商那里取得价格,由于会有好多个服务商,因此我们先将操作封装到Shop类中。

public class Shop {
    Random random = new Random();
    String name;
    public Shop(String name) {
        this.name = name;
    }
    public double getPrice(String product) {
        return caculatePrice(product);
    }

    // price既跟店铺name有关系,也跟product有关系
    public double caculatePrice(String product) {
        delay();
        return random.nextDouble() * name.charAt(0) + product.charAt(1);
    }
    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

我们用 delay 来模拟耗时操作,每次从服务商那边获取价格有一个1s的延迟,可以看到如果串行获取多个服务商的价格的话,延迟会非常严重,对用户来说是不可接受的。

以前的future方式

我们可以将获取价格封装一个异步版本,返回Future,在需要的时候使用get方法来得到返回的价格

public Future<Double>  getPriceAsync(String product) {
    CompletableFuture<Double> future = new CompletableFuture<>();
    new Thread(()->{
        double price = getPrice(product);
        future.complete(price);
    }).start();
    return future;
}

我们来测试一下异步版本的耗时:

public static void singleShop() throws ExecutionException, InterruptedException {
    Shop shop = new Shop("");
    long current = System.currentTimeMillis();
    Future<Double> future = shop.getPriceAsync("abc");
    long returned = System.currentTimeMillis();
    System.out.println("返回使用了:" + (returned - current) + "msecs");

    double price = future.get();
    long caculated = System.currentTimeMillis();
    System.out.println("price is " + price);
    System.out.println("计算使用时间:" + (caculated - current) + "msecs");
}

测试结果:

返回使用了:75msecs
price is 140.00108871644375
计算使用时间:1077msecs

可以看到方法返回的速度是很快的,在返回后与得到值之间有很长的间隔,我们可以利用这段时间来做点别的。

CompletableFuture方式

Java8提供了CompletableFuture,里面有supplyAsync方法可以让我们直接提交一个任务,返回Future 可以看到代码精简到了一行。

public Future<Double> getPriceAsyncElegently(String product) {
    return CompletableFuture.supplyAsync(() -> getPrice(product));
}

看到这你可能会说了,不就是把操作封装了一下嘛,我自己也可以写一个方法,然后一行返回,别急,我们接着来看CompletableFuture提供给我们的其他功能,简直不要太顺手。

与Stream结合使用

上面说了会从很多的服务商那边获取价格,上面只是获取了一家,但假如是10家呢?我们就需要写10遍了,太繁琐,我们使用Stream来实现一下。

先声明一下店铺,我直接复制了多个店铺:

List<Shop> shopList = Arrays.asList(new Shop("a"),
     new Shop("b"),
     new Shop("b"),
     new Shop("b"),
     new Shop("b"),
     new Shop("b"),
     new Shop("b"),
     new Shop("b"),
     new Shop("b"),
     new Shop("c"));

用CompletableFuture跟Stream结合来计算价格

public static List<Double> manyShopsFuture(String product) {
    List<CompletableFuture<Double>> stream = shopList.stream()
        .map(s -> CompletableFuture.supplyAsync(() -> s.getPrice(product)))
        .collect(Collectors.toList());

    return stream.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

在这里我们使用了2个stream来操作,因为如果把join操作写到第一个stream中的话,实际上操作已经变成了线性的了,所以这里我们先获取future,再统一join等待结果返回。

不过还记得么,Stream类也提供了并行流,实现起来好像更加简单:

public static List<Double> manyShopsParallel(String product) {
    return shopList.parallelStream().map(shop -> shop.getPrice(product)).collect(Collectors.toList());
}

我们测试一下,比较下两者的运行效率:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    String product = "abc";
    long current = System.currentTimeMillis();
    manyShopsParallel(product);
    long future = System.currentTimeMillis();
    System.out.println("manyShopsParallel cost:" + (future - current));
    manyShopsFuture(product);
    long stream = System.currentTimeMillis();
    System.out.println("manyShopsFuture cost:" + (stream - future));
}

执行结果

manyShopsParallel cost:3153

manyShopsFuture cost:4002

可以看到使用ParallelStream更高效一些,写了这么多,效率却不如默认的好,那如何提高我们自己的程序的运行效率呢?

提供自己的线程池

其实CompletableFuture跟parallelStream一样,都是使用的ForkJoinPool中的默认线程池,线程数量默认为机器的内核数Runtime.getRuntime().availableProcessors(),对于我们这样的等待时间长,IO密集型的应用来说,CPU是大大的浪费了的,parallelStream是无法定制线程池的,但是CompletableFuture我们却可以自行提供,以便根据自己的应用情况作出调整。

《Java并发编程实战》中给过一个计算线程池线程数的公式,为:

Nthreads = Ncpu * Ucpu * (1 + W/C)

其中:

NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到

UCPU是期望的CPU利用率(该值应该介于0和1之间)

W/C是等待时间与计算时间的比率

大家可以计算一下自己的,我这里Ncpu=2,Ucpu=100%,W/C = 1/0.01 = 100 ,因此取线程数=200来构造线程池 如下:

static Executor executor = Executors.newFixedThreadPool(200, new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        return thread;
    }
});
public static List<Double> manyShopsFuture(String product) {
     List<CompletableFuture<Double>> stream = shopList.stream()
         .map(s -> CompletableFuture.supplyAsync(() -> s.getPrice(product),executor))
         .collect(Collectors.toList());

     return stream.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

再次执行一下看计算时间:

manyShopsParallel cost:3250

manyShopsFuture cost:1006

Future方式可以说是完全并行了,而parallelStream由于使用默认线程池,并不能一次性全部将任务执行,需要更长的执行时间。

CompletableFuture组合异步任务

假设我们在获取价格之后,还需要查询服务商的折扣服务才能计算最终展示的价格,这个延迟也会比较大,我们如何来组合这两个异步任务呢?CompletableFuture提供了一系列的then方法,我们这里使用两种来演示一下,一个是thenApply,一个是thenCompose, thenApply是对结果进行处理,thenCompose是组合一个新的任务

先定义一下Discount

public class Discount {

    public static Double applyDiscount(Double price) {
        Double discount = getDiscount();
        return price * discount;
    }

    public static Double getDiscount() {
        Shop.delay();
        return 0.5;
    }
}

然后看一下任务组合调用:

public static List<Double> manyShopsApplyWithDiscount(String product) {
    List<CompletableFuture<Double>> stream = shopList.stream()
        .map(s -> CompletableFuture.supplyAsync(() -> s.getPrice(product),executor))
        .map(future -> future.thenApply(Discount::applyDiscount))
        .collect(Collectors.toList());

    return stream.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
public static List<Double> manyShopsComposeWithDiscount(String product) {
    List<CompletableFuture<Double>> stream = shopList.stream()
        .map(s -> CompletableFuture.supplyAsync(() -> s.getPrice(product),executor))
        .map(future -> future.thenCompose(price ->
            CompletableFuture.supplyAsync(()-> Discount.applyDiscount(price),executor)))
        .collect(Collectors.toList());

    return stream.stream().map(CompletableFuture::join).collect(Collectors.toList());
}

thenCompose中我们通过supplyAsync 再次提交了一次异步任务,而在thenApply中我们直接在原流水线上进行数据处理,不过不会阻塞流水线,也是提交了一个任务,不过是同步执行。这两个方法在我看来就是处理参数的不同而已,不用太过纠结。

测试一下性能:

manyShopsComposeWithDiscount cost:2126

manyShopsApplyWithDiscount cost:2019

thenApply方法由于减少了线程切换执行时间相对较短,也提醒我们在编程过程中注意这方面的开销。

最后

CompletableFuture还提供了很多其他的API可供我们使用,比如说thenCombine可以结合两个没有先后关系的异步任务,但是提供回调来处理两个任务的结果,等着大家去发现使用。

© 著作权归作者所有

52iSilence7

52iSilence7

粉丝 6
博文 103
码字总数 83225
作品 0
海淀
高级程序员
私信 提问

暂无文章

Taro 兼容 h5 踩坑指南

最近一周在做 Taro 适配 h5 端,过程中改改补补,好不酸爽。 本文记录📝遇到的问题,希望为有相同需求的哥们👬节约点时间。 Taro 版本:1.3.9。 解决跨域问题 h5 发请求会报跨域问题,需...

dkvirus
58分钟前
4
0
Spring boot 静态资源访问

0. 两个配置 spring.mvc.static-path-patternspring.resources.static-locations 1. application中需要先行的两个配置项 1.1 spring.mvc.static-path-pattern 这个配置项是告诉springboo......

moon888
今天
3
0
hash slot(虚拟桶)

在分布式集群中,如何保证相同请求落到相同的机器上,并且后面的集群机器可以尽可能的均分请求,并且当扩容或down机的情况下能对原有集群影响最小。 round robin算法:是把数据mod后直接映射...

李朝强
今天
4
0
Kafka 原理和实战

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/bV8AhqAjQp4a_iXRfobkCQ 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。先后从事过电子商务、开放平...

vivo互联网技术
今天
19
0
java数据类型

基本类型: 整型:Byte,short,int,long 浮点型:float,double 字符型:char 布尔型:boolean 引用类型: 类类型: 接口类型: 数组类型: Byte 1字节 八位 -128 -------- 127 short 2字节...

audience_1
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部