场景
在分布式和微服务开发模式下, 如果某一个服务需要依赖多个下游子服务, 就会造成整体耗时过长, 影响到系统的性能.
如果采用多线程并行的方式进行子服务调度, 耗时问题就会得到优化.
举例如下图, 一个服务依赖了4个子服务, 其中rpc3又依赖于rpc1和rpc2的返回, 上面是串行模式, 下面是并行模式.
DEMO
串行模式
package main.java;
public class HelloWorld1 {
//串行调用
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
String rpc1 = rpc1();
String rpc2 = rpc2();
String rpc3 = rpc3(rpc1,rpc2);
String rpc4 = rpc4();
System.out.println("最终结果:"+rpc3+rpc4+";总耗时:"+(System.currentTimeMillis()-startTime) +"ms");
}
public static String rpc1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc1-";
}
public static String rpc2() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc2-";
}
public static String rpc3(String rpc1Res,String rpc2Res) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return rpc1Res+rpc2Res+"rpc3-";
}
public static String rpc4() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc4-";
}
}
最终结果:rpc1-rpc2-rpc3-rpc4-;总耗时:4006ms
多线程模式
1, 不使用CompletableFuture
import java.util.concurrent.CountDownLatch;
public class HelloWorld2 {
//多线程模式,不使用CompletableFuture
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
//计数器A,用来协助组合rpc1和rpc2调度
CountDownLatch latchA = new CountDownLatch(2);
//计数器B,用来协助组合rpc3和rpc4调度
CountDownLatch latchB = new CountDownLatch(2);
final String[] rpc1 = {null};
new Thread(() ->{
rpc1[0] =rpc1();
latchA.countDown();
}).start();
final String[] rpc2 = {null};
new Thread(() ->{
rpc2[0] =rpc2();
latchA.countDown();
}).start();
final String[] rpc4 = {null};
new Thread(() ->{
rpc4[0] =rpc4();
latchB.countDown();
}).start();
//阻塞等待rpc1+rpc2执行完,因为rpc3依赖rpc1+rpc2的返回
latchA.await();
final String[] rpc3 = {null};
new Thread(() ->{
rpc3[0] =rpc3(rpc1[0],rpc2[0]);
latchB.countDown();
}).start();
//阻塞等待rpc3+rpc4执行完
latchB.await();
System.out.println("最终结果:"+rpc3[0]+rpc4[0]+";总耗时:"+(System.currentTimeMillis()-startTime) +"ms");
}
public static String rpc1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc1-";
}
public static String rpc2() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc2-";
}
public static String rpc3(String rpc1Res,String rpc2Res) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return rpc1Res+rpc2Res+"rpc3-";
}
public static String rpc4() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc4-";
}
}
最终结果:rpc1-rpc2-rpc3-rpc4-;总耗时:2074ms
2, 使用CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
public class HelloWorld3 {
//多线程模式,使用CompletableFuture组合多线程
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
final String[] rpc1 = {null};
CompletableFuture cf1 = CompletableFuture.runAsync(() ->{
rpc1[0] =rpc1();
});
final String[] rpc2 = {null};
CompletableFuture cf2 = CompletableFuture.runAsync(() ->{
rpc2[0] =rpc2();
});
final String[] rpc4 = {null};
CompletableFuture cf4 = CompletableFuture.runAsync(() ->{
rpc4[0] =rpc4();
});
final String[] rpc3 = {null};
CompletableFuture cf3 = CompletableFuture.allOf(cf1,cf2).thenRun(()->{
rpc3[0] =rpc3(rpc1[0],rpc2[0]);
});
//阻塞等待最终结果
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture.allOf(cf3,cf4).thenRun(()->{
System.out.println("最终结果:"+rpc3[0]+rpc4[0]+";总耗时:"+(System.currentTimeMillis()-startTime) +"ms");
latch.countDown();
});
latch.await();
}
public static String rpc1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc1-";
}
public static String rpc2() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc2-";
}
public static String rpc3(String rpc1Res,String rpc2Res) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return rpc1Res+rpc2Res+"rpc3-";
}
public static String rpc4() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "rpc4-";
}
}
最终结果:rpc1-rpc2-rpc3-rpc4-;总耗时:2063ms
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。