使用CompletableFuture提升服务性能

原创
2021/06/15 00:46
阅读数 224

场景

在分布式和微服务开发模式下, 如果某一个服务需要依赖多个下游子服务,  就会造成整体耗时过长, 影响到系统的性能.  

如果采用多线程并行的方式进行子服务调度, 耗时问题就会得到优化.

举例如下图,  一个服务依赖了4个子服务, 其中rpc3又依赖于rpc1和rpc2的返回,  上面是串行模式, 下面是并行模式.

DEMO

串行模式

package main.java;

public class HelloWorld1 {

    //串行调用
    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        String rpc1 = rpc1();
        String rpc2 = rpc2();
        String rpc3 = rpc3(rpc1,rpc2);
        String rpc4 = rpc4();
        System.out.println("最终结果:"+rpc3+rpc4+";总耗时:"+(System.currentTimeMillis()-startTime) +"ms");
    }

    public static String rpc1() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc1-";
    }

    public static String rpc2() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc2-";
    }

    public static String rpc3(String rpc1Res,String rpc2Res) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return rpc1Res+rpc2Res+"rpc3-";
    }

    public static String rpc4() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc4-";
    }

}

最终结果:rpc1-rpc2-rpc3-rpc4-;总耗时:4006ms

 

多线程模式

1, 不使用CompletableFuture

import java.util.concurrent.CountDownLatch;

public class HelloWorld2 {

    //多线程模式,不使用CompletableFuture
    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        //计数器A,用来协助组合rpc1和rpc2调度
        CountDownLatch latchA = new CountDownLatch(2);
        //计数器B,用来协助组合rpc3和rpc4调度
        CountDownLatch latchB = new CountDownLatch(2);

        final String[] rpc1 = {null};
        new Thread(() ->{
            rpc1[0] =rpc1();
            latchA.countDown();
        }).start();

        final String[] rpc2 = {null};
        new Thread(() ->{
            rpc2[0] =rpc2();
            latchA.countDown();
        }).start();

        final String[] rpc4 = {null};
        new Thread(() ->{
            rpc4[0] =rpc4();
            latchB.countDown();
        }).start();

        //阻塞等待rpc1+rpc2执行完,因为rpc3依赖rpc1+rpc2的返回
        latchA.await();
        final String[] rpc3 = {null};
        new Thread(() ->{
            rpc3[0] =rpc3(rpc1[0],rpc2[0]);
            latchB.countDown();
        }).start();

        //阻塞等待rpc3+rpc4执行完
        latchB.await();
        System.out.println("最终结果:"+rpc3[0]+rpc4[0]+";总耗时:"+(System.currentTimeMillis()-startTime) +"ms");
    }

    public static String rpc1() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc1-";
    }

    public static String rpc2() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc2-";
    }

    public static String rpc3(String rpc1Res,String rpc2Res) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return rpc1Res+rpc2Res+"rpc3-";
    }

    public static String rpc4() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc4-";
    }

}

最终结果:rpc1-rpc2-rpc3-rpc4-;总耗时:2074ms

2, 使用CompletableFuture

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

public class HelloWorld3 {

    //多线程模式,使用CompletableFuture组合多线程
    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        final String[] rpc1 = {null};
        CompletableFuture cf1 = CompletableFuture.runAsync(() ->{
            rpc1[0] =rpc1();
        });

        final String[] rpc2 = {null};
        CompletableFuture cf2 = CompletableFuture.runAsync(() ->{
            rpc2[0] =rpc2();
        });

        final String[] rpc4 = {null};
        CompletableFuture cf4 = CompletableFuture.runAsync(() ->{
            rpc4[0] =rpc4();
        });

        final String[] rpc3 = {null};
        CompletableFuture cf3 = CompletableFuture.allOf(cf1,cf2).thenRun(()->{
            rpc3[0] =rpc3(rpc1[0],rpc2[0]);
        });

        //阻塞等待最终结果
        CountDownLatch latch = new CountDownLatch(1);
        CompletableFuture.allOf(cf3,cf4).thenRun(()->{
            System.out.println("最终结果:"+rpc3[0]+rpc4[0]+";总耗时:"+(System.currentTimeMillis()-startTime) +"ms");
            latch.countDown();
        });

        latch.await();

    }

    public static String rpc1() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc1-";
    }

    public static String rpc2() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc2-";
    }

    public static String rpc3(String rpc1Res,String rpc2Res) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return rpc1Res+rpc2Res+"rpc3-";
    }

    public static String rpc4() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "rpc4-";
    }

}

最终结果:rpc1-rpc2-rpc3-rpc4-;总耗时:2063ms

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部