Java非阻塞异步编程——CompletableFuture

04/12 16:40
阅读数 573

前言

对于Node开发者来说,非阻塞异步编程是他们引以为傲的地方。而在JDK8中,也引入了非阻塞异步编程的概念。所谓非阻塞异步编程,就是一种不需要等待返回结果的多线程的回调方法的封装。使用非阻塞异步编程,可以很大程度上解决高并发场景的各种问题,提高程序的运行效率。

为什么要使用非阻塞异步编程

在jdk8之前,我们使用java的多线程编程,一般是通过Runnable中的run方法进行的。这种方法有个明显的缺点:没有返回值。这时候,大家会使用Callable+Future的方式去实现,代码如下。

public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> stringFuture = executor.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(2000);
            return "async thread";
        }
    });
    Thread.sleep(1000);
    System.out.println("main thread");
    System.out.println(stringFuture.get());
}

这无疑是对高并发访问的一种缓冲方法。这种方式有一个致命的缺点就是阻塞式调用,当调用了get方法之后,会有大量的时间耗费在等待返回值之中。

不管怎么看,这种做法貌似都不太妥当,至少在代码美观性上就看起来很蛋疼。而且某些场景无法使用,比如:

  • 多个异步线程执行时间可能不一致,我们的主线程不能一直等着。
  • 两个异步任务之间相互独立,但是第二个依赖第一个的执行结果

在这种场景下,CompletableFuture的优势就展现出来了 。同时,CompletableFuture的封装中使用了函数式编程,这让我们的代码显得更加简洁、优雅。

不了解函数式编程的朋友,可以参考我之前的博客。JDK8新特性

CompletableFuture使用详解

runAsync和supplyAsync方法

CompletableFuture提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

代码示例

    /**
     * 无返回值
     *
     * @throws Exception
     */
    @Test
    public void testRunAsync() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (Exception ignored) {
            }
            System.out.println("run end ...");
        });
        future.get();
    }

    /**
     * 有返回值
     *
     * @throws Exception
     */
    @Test
    public void testSupplyAsync() throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("run end...");
            return System.currentTimeMillis();
        });
        Long time = future.get();
        System.out.println(time);
    }

计算结果完成时的回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的操作。

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

这里需要说的一点是,whenComplete和whenCompleteAsync的区别。

  • whenComplete:使用执行当前任务的线程继续执行whenComplete的任务。
  • whenCompleteAsync:使用新的线程执行任务。
  • exceptionally:执行出现异常时,走这个方法。

代码示例

    /**
     * 当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。
     * whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
     * whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
     * exceptionally:执行出现异常时,走这个方法
     *
     * @throws Exception
     */
    @Test
    public void testWhenComplete() throws Exception {
        CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结束");
        }).whenComplete((t, action) -> {
            System.out.println("执行完成");
        }).exceptionally(t -> {
            System.out.println("出现异常:" + t.getMessage());
            return null;
        });
        TimeUnit.SECONDS.sleep(2);
    }

thenApply

当一个线程依赖另一个线程时,可以使用thenApply方法把这两个线程串行化,第二个任务依赖第一个任务的返回值。

代码示例

    /**
     * 当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
     * 第二个任务依赖第一个任务的结果
     *
     * @throws Exception
     */
    @Test
    public void testThenApply() throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            long result = new Random().nextInt();
            System.out.println("result:" + result);
            return result;
        }).thenApply(t -> {
            long result = t * 5;
            System.out.println("result2:" + result);
            return result;
        });
        Long result = future.get();
        System.out.println(result);
    }

handle

handle是执行任务完成时对结果的处理。与thenApply方法处理方式基本一致,

不同的是,handle是在任务完成后执行,不管这个任务是否出现了异常,而thenApply只可以执行正常的任务,任务出现了异常则不执行。

代码示例

    /**
     * handle 是执行任务完成时对结果的处理。
     * handle 方法和 thenApply 方法处理方式基本一样。
     * 不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。
     * thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
     *
     * @throws Exception
     */
    @Test
    public void testHandle() throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 0;
            return i;
        }).handle((p, t) -> {
            int result = -1;
            if (t == null) {
                result = p * 2;
            } else {
                System.out.println(t.getMessage());
            }
            return result;
        });
        System.out.println(future.get());
    }

thenAccept

thenAccept用于接收任务的处理结果,并消费处理,无返回结果。

代码示例

    /**
     * 接收任务的处理结果,并消费处理,无返回结果。
     *
     * @throws Exception
     */
    @Test
    public void testThenAccept() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            return new Random().nextInt();
        }).thenAccept(num -> {
            System.out.println(num);
        });
        System.out.println(future.get());
    }

thenRun

上个任务执行完之后再执行thenRun的任务,二者只存在先后执行顺序的关系,后者并不依赖前者的计算结果,同时,没有返回值。

代码示例

    /**
     * 该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。
     * 只是处理玩任务后,执行 thenRun 的后续操作。
     *
     * @throws Exception
     */
    @Test
    public void testThenRun() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            return new Random().nextInt();
        }).thenRun(() -> {
            System.out.println("进入了thenRun");
        });
        System.out.println(future.get());
    }

thenCombine

thenCombine会把两个CompletableFuture的任务都执行完成后,把两个任务的返回值一块交给thenCombine处理(有返回值)。

代码示例

    /**
     * thenCombine 会把 两个 CompletableFuture 的任务都执行完成后
     * 把两个任务的结果一块交给 thenCombine 来处理。
     *
     * @throws Exception
     */
    @Test
    public void testThenCombine() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            return "world";
        }), (t1, t2) -> {
            return t1 + " " + t2;
        });
        System.out.println(future.get());
    }

thenAcceptBoth

当两个CompletableFuture都执行完成后,把结果一块交给thenAcceptBoth处理(无返回值)

代码示例

    /**
     * 当两个 CompletableFuture 都执行完成后
     * 把结果一块交给thenAcceptBoth来进行消耗
     *
     * @throws Exception
     */
    @Test
    public void testThenAcceptBoth() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            return "world";
        }), (t1, t2) -> {
            System.out.println(t1 + " " + t2);
        });
        System.out.println(future.get());
    }

applyToEither

两个CompletableFuture,谁执行返回的结果快,就用哪个的结果进行下一步操作(有返回值)。

代码示例

    /**
     * 两个CompletableFuture,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作
     *
     * @throws Exception
     */
    @Test
    public void testApplyToEither() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            return "world";
        }), (t) -> {
            return t;
        });
        System.out.println(future.get());
    }

acceptEither

两个CompletableFuture,谁执行返回的结果快,就用哪个的结果进行下一步操作(无返回值)。

代码示例

    /**
     * 两个CompletableFuture,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
     *
     * @throws Exception
     */
    @Test
    public void testAcceptEither() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            return "world";
        }), t1 -> {
            System.out.println(t1);
        });
        System.out.println(future.get());
    }

runAfterEither

两个CompletableFuture,任何一个完成了都会执行下一步操作

代码示例

    /**
     * 两个CompletableFuture,任何一个完成了都会执行下一步的操作
     *
     * @throws Exception
     */
    @Test
    public void testRunAfterEither() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            return "world";
        }), () -> {
            System.out.println("执行完了");
        });
        System.out.println(future.get());
    }

runAfterBoth

两个CompletableFuture,都完成了才会执行下一步操作。

代码示例

    /**
     * 两个CompletableFuture,都完成了计算才会执行下一步的操作
     *
     * @throws Exception
     */
    @Test
    public void testRunAfterBoth() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
            return "world";
        }), () -> {
            System.out.println("执行完了");
        });
        System.out.println(future.get());
    }

thenCompose

thenCompose方法允许对两个CompletableFuture进行流水线操作,当第一个操作完成时,将其结果作为参数传递给第二个操作。

代码示例

    /**
     * thenCompose 方法允许你对两个 CompletableFuture 进行流水线操作,
     * 第一个操作完成时,将其结果作为参数传递给第二个操作。
     * @throws Exception
     */
    @Test
    public void testThenCompose() throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt();
            System.out.println(t);
            return t;
        }).thenCompose(param -> {
            return CompletableFuture.supplyAsync(() -> {
                int t = param * 2;
                System.out.println("t2=" + t);
                return t;
            });
        });
        System.out.println(future.get());
    }

结语

CompletableFuture是jdk8中新增的一个特性,特点是非阻塞异步编程。合理的使用非阻塞异步编程,比如将两步关联不大的操作并行处理,可以优化代码的执行效率。同时,在高并发场景下,CompletableFuture也可以进行有效的性能优化。

如果你觉得该博客对你有帮助,不放动动手指加一下交流群。

发布了35 篇原创文章 · 获赞 45 · 访问量 6161
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部