JAVA语言异步非阻塞设计模式(原理篇)

原创
07/14 10:28
阅读数 2.6K

前言

本系列文章共2篇,对 Java 语言的异步非阻塞模式进行科普。


本篇原理篇讲解异步非阻塞模型的原理,以及核心设计模式“Promise”的基本特性。应用篇(7月21日更新)会展示更加丰富的应用场景,介绍 Promise 的变体,如异常处理、调度策略等,并将 Promise 和现有工具进行对比。


限于篇幅,本系列以科普为主,内容更偏重于原理、API 设计、应用实践,不会深入讲解并发优化的具体细节。如果小伙伴们想要进一步交流,可以添加有道技术团队助手(ydtech01)与我们联络。


作者/ 白宇

编辑/ 刘振宇


1.概述


异步非阻塞[A]是一种高性能的线程模型,在 IO 密集型系统中得到广泛应用。

在该模型下,系统发起耗时请求后不需要等待响应,期间可以执行其他操作;当收到响应后,系统收到通知并执行后续处理。由于消除了不必要的等待,这种模型能够充分利用 cpu、线程等资源,提高资源利用率。

然而,异步非阻塞模式在提升性能的同时,也带来了编码实现上的复杂性。请求和响应可能分离到不同线程中,需要编写额外代码完成响应结果的传递。Promise 设计模式[B]可以降低这种复杂性,封装数据传递、时序控制、线程安全等实现细节,从而提供简洁的 API 形式。

本文首先介绍异步非阻塞模式,从线程模型的角度分析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的应用场景及工作流程。最后,提供一种简易的 Java 实现,能够实现基本的功能需求,并做到线程安全。


在正式探索技术问题之前,我们先来看看什么是异步非阻塞模型。如图1-1所示,展示了两个小人通信的场景:

图1-1 两个小人通信


  1. 两个小人代表互相通信的 两个线程,如数据库的客户端和服务端;他们可以部署在不同的机器上。
  2. 小人之间 互相投递苹果,代表要传递的消息。根据具体业务场景,这些消息可能会称为request、response、packet、document、record 等。
  3. 小人之间需要 建立信道,消息才得以传递。根据场景,信道称为 channel、connection等。


假设左侧小人发起请求,而右侧小人处理请求并发送响应:左侧小人先投出一个苹果 request,被右侧小人接收到;右侧小人进行处理后,再投出苹果 response,被左侧小人接收到。我们考察左侧小人在等待响应期间的行为,根据他在等待 response 期间是否能处理其他工作,将其归纳为“同步阻塞”和“异步非阻塞”两种模式。


首先我们看看同步阻塞式通信的流程,如图1-2a所示。

图1-2a 同步阻塞式通信


  1. 投递。左侧小人投递request,并等待接收response。
  2. 等待。在等待接收response期间,左侧小人休息。不论是否还有其他request需要投递、是否还有其他工作需要处理,他都视若无睹,绝对不会因此打断休息。
  3. 响应。在收到response后,小人从休息中唤醒并处理response。


接下来我们看看异步非阻塞式通信的流程,如图1-2b所示。


图1-2b 异步非阻塞式通信


  1. 缓存。左侧小人投递 request,并等待接收 response 。和同步阻塞模式不同,小人并不需要亲手接住苹果 response,而是在地上放置一个盘子称为“buffer”;如果小人暂时不在场,那么所收到的苹果可以先存在盘子里,稍后再处理。

  2. 暂离。由于有盘子 buffer 的存在,小人投递 request 后就可以暂时离开,去处理其他工作,当然也可以去投递下一个 request;如果需要向不同的 channel 投递 request ,那么小人可以多摆放几个盘子,和 channel 一一对应。

  3. 响应。小人离开后,一旦某个盘子收到了 response ,一只“大喇叭”就会响起,发出“channelRead”通知,呼唤小人回来处理 response。如果要处理多个 response 或多个 channel,那么 channelRead 通知还需要携带参数,以说明从哪个 channel 上收到了哪个 response。


这里的大喇叭可以用 NIO 或 AIO 来实现。简单来说,NIO 是指不停地轮询每个盘子,一旦看到苹果就发出通知;AIO 是指在收到苹果时直接触发通知,而没有轮询的过程。


当然,本系列文章的读者并不需要了解更多实现细节,只需知道异步非阻塞模式依赖于“大喇叭”来实现,它替代小人等待接收 response,从而解放小人去处理其他工作。


根据上面的分析,同步模式具有下列严重缺点


  1. 同步阻塞模式的工作效率十分低下。小人大部分时间都在休息,仅当投递请求、处理响应时,才偶尔醒来工作一小会;而在异步非阻塞模式下,小人从不休息,马不停蹄地投递请求、处理响应,或处理其他工作。
  2. 同步阻塞模式会带来延迟。

我们考虑下面两种情况,如图1-3所示。


a. channel 复用,即左侧小人在一个 channel 上连续发送多条消息。在同步阻塞模式下,一轮(即请求+响应)只能投递一个请求(苹果1),而后续请求(苹果2-4)都只能排队等待,右侧小人需要等待很多轮才能收到所期望的全部消息。此外,左侧小人在等待接收某个response期间,没有机会处理收到的其他消息,造成了数据处理的延迟。不得不感慨,左侧小人太懒惰了!

图1-3a channel 复用

b. 线程复用,即一个线程(小人)向多个 channel 发送消息(苹果1-3,分别发向不同channel)。左侧小人同一时刻只能做一件事,要么在工作,要么在休息;他投递了苹果1后就躺下休息,等待响应,全然不顾右侧小人2、3还在等待他们想要的苹果2、3。

图1-3b 线程复用


在这一章里我们用漫画的形式,初步体验了同步阻塞模式与异步非阻塞模式,并分析了两种模式的区别。接下来我们从 Java 线程入手,对两种模式进行更加正式、更加贴近实际的分析。


2.异步非阻塞模型


2.1

 Java 线程状态


在 Java 程序中,线程是调度执行的单元。线程可以获得 CPU 使用权来执行代码,从而完成有意义的工作。工作进行期间,有时会因为等待获取锁、等待网络 IO 等原因而暂停,通称“同步”或“阻塞”;如果多项工作能够同时进行,之间不存在约束、不需要互相等待,这种情况就称为“异步”或“非阻塞”

受限于内存、系统线程数、上下文切换开销,Java 程序并不能无限创建线程;因此,我们只能创建有限个线程,并尽量提高线程的利用率,即增加其工作时长、降低阻塞时长。异步非阻塞模型是减少阻塞、提高线程利用率的有效手段。当然,这种模型并不能消除所有的阻塞。我们首先来看看 Java 线程有哪些状态[C],其中哪些阻塞是必要的,哪些阻塞可以避免。


Java线程状态包括:


  • RUNNABLE:线程在执行有意义的工作

如图2-1a,线程如果在执行纯内存运算,那么处于 RUNNABLE 状态

根据是否获得 cpu 使用权,又分为两个子状态:READY、RUNNING

  • BLOCKED/WAITING/TIMED_WAITING:线程正在阻塞

如图2-1b、2-1c、2-1d,根据阻塞原因,线程处于下列状态之一:

    • BLOCKED:synchronized 等待获取锁;

    • WAITING/TIMED_WAITING:Lock 等待获取锁。两种状态的区别为是否设置超时时长。

图2-1 Java线程状态


此外,如果 Java 线程正在进行网络 IO,则线程状态为 RUNNABLE,但是实际上也发生了阻塞。以 socket 编程为例,如图2-2 所示,在收到数据之前 InputStream.read() 会阻塞,此时线程状态为 RUNNABLE。

图2-2 网络IO


综上,Java 线程状态包括:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中,RUNNABLE 状态又分为内存计算(非阻塞)、网络 IO(阻塞)两种情况,而其余状态都是阻塞的。


根据阻塞原因,本文将 Java 线程状态归纳为以下3类:RUNNABLE、IO、BLOCKED


  1. RUNNABLE:Java线程状态为RUNNABLE,并且在执行有用的内存计算,无阻塞;

  2. IO:Java 线程状态为 RUNNABLE,但是正在进行网络 IO,发生阻塞;

  3. BLOCKED:Java 线程状态为 BLOCKED/WAITING/TIMED_WAITING,在并发工具的控制下,线程等待获取某一种锁,发生阻塞。


提高线程利用率,就要增加线程处于 RUNNABLE 状态的时长,降低处于 IO 和 BLOCKED状态的时长。BLOCKED 状态一般是不可避免的,因为线程间需要通信,需要对临界区进行并发控制;但是,如果采用适当的线程模型,那么IO状态的时长就可以得到降低,而这就是异步非阻塞模型。


2.2 

线程模型:阻塞 vs 非阻塞


异步非阻塞模型能够降低 IO 阻塞时长,提高线程利用率。下面以数据库访问为例,分析同步和异步 API 的线程模型。如图3所示,过程中涉及3个函数:


  1. writeSync() 或 writeAsync():数据库访问,发送请求

  2. process(result):处理服务器响应(以 result 表示)

  3. doOtherThings():任意其他操作,逻辑上不依赖服务器响应


同步 API 如图2-3a所示:调用者首先发送请求,然后在网络连接上等待来自服务器的响应数据。API 会一直阻塞,直至收到响应才返回;期间调用者线程无法执行其他操作,即使该操作并不依赖服务器响应。实际的执行顺序为:


  1. writeSync()

  2. process(result)

  3. doOtherThings() // 直至收到结果,当前线程才能执行其他操作


异步 API 如图2-3b所示:调用者发送请求并注册回调,然后 API 立刻返回,接下来调用者可以执行任意操作。稍后底层网络连接收到响应数据,触发调用者所注册的回调。实际的执行顺序为:


  1. writeAsync()

  2. doOtherThings() // 已经可以执行其他操作,并不需要等待响应

  3. process(result)


图2-3 同步API & 异步API


在上述过程中,函数 doOtherThings() 并不依赖服务器响应,原则上可以和数据库访问同时执行。然而对于同步 API,调用者被迫等待服务器响应,然后才可以执行 doOtherThings();即数据库访问期间线程阻塞于 IO 状态,无法执行其他有用的操作,利用率十分低下。而异步API 就没有这个限制,显得更加紧凑、高效。

在 IO 密集型系统中,适当使用异步非阻塞模型,可以提升数据库访问吞吐量。

考虑这样一个场景:需要执行多条数据库访问请求,且请求之间互相独立,无依赖关系。使用同步 API 和异步 API ,线程状态随时间变化的过程如图2-4所示。

图2-4 线程时间线:数据库访问

线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下,线程执行内存计算,如提交请求、处理响应。在 IO 状态下,线程在网络连接上等待响应数据。在实际系统中,内存计算的速度非常快,RUNNABLE 状态的时长基本可忽略;而网络传输的耗时会相对更长(几十到几百毫秒),IO 状态的时长更加可观。

  1. 同步API:调用者线程一次只能提交一个请求;直到请求返回后,才能再提交下一个请求。线程利用率很低,大部分时间消耗在 IO 状态上。

  2. 异步API:调用者线程可以连续提交多个请求,而之前提交的请求都还没有收到响应。调用者线程会注册一些回调,这些回调存储在内存中;稍后网络连接上收到响应数据,某个接收线程被通知处理响应数据,从内存中取出所注册的回调,并触发回调。这种模型下,请求可以连续地提交、连续的响应,从而节约 IO 状态的耗时。


异步非阻塞模式在 IO 密集型系统中应用非常广泛。常用的中间件,如 http请求[D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H],都支持异步 API。各位读者可以在参考文献中,查阅这些异步 API 的样例代码。关于中间件的异步 API ,下面有几个注意事项

  1. redis 的常见客户端有 jedis 和 lettuce[E] 。其中 lettuce 提供了异步 API,而 jedis只能提供同步 API ;二者对比参见文章[I]

  2. kafka producer[J] 的 send() 方法也支持异步 API ,但是该 API 实际上不是纯异步的[K]:当底层缓存满,或者无法获取服务器(broker)信息时,send() 方法会发生阻塞。个人认为这是一个非常严重的设计缺陷。kafka 常用于低延迟日志采集场景,系统会将日志通过网络写入到 kafka 服务器,以减少线程内的阻塞,提升线程吞吐量;稍后其他进程会从 kafka 消费所写入的日志,进行持久存储。设想一个实时通信系统,单条线程每秒需要处理几万到几十万条消息,响应时间一般为几毫秒到几十毫秒。系统在处理期间需要经常调用 send()来上报日志,如果每次调用都发生哪怕1秒的延迟(实际有可能达几十秒),延迟积累起来也会严重劣化吞吐量和延迟。


最后,异步 API 有多种实现,包括线程池、select(如netty 4.x[L])、epoll 等。其共同点是调用者不需要在某一条网络连接上阻塞,以等待接收数据;相反,API底层常驻有限数目的线程,当收到数据后,某一线程得到通知并触发回调。这种模型也称为“响应式”模型,非常贴切。限于篇幅原因,本文主要关注异步 API 设计,而不深入讲解异步API的实现原理。


3.Promise设计模式


3.1

 API 形式:同步、异步 listener、异步 Promise


上一章介绍了异步非阻塞模式和异步 API 的函数形式。异步 API 具有以下特征

  1. 在提交请求时注册回调;

  2. 提交请求后,函数立刻返回,不需要等待收到响应;

  3. 收到响应后,触发所注册的回调;根据底层实现,可以利用有限数目的线程来接收响应数据,并在这些线程中执行回调。

在保留异步特性的基础上,异步 API 的形式可以进一步优化。上一章图2-3b 展示了异步 API的 listener 版本,特点是在提交请求时必须注册恰好一个回调;因而在下列场景下,listener API 会难以满足功能需求,需要调用者做进一步处理:


  1. 多个对象都关注响应数据,即需要注册多个回调;但是 listener 只支持注册一个回调。

  2. 需要将异步调用转为同步调用。例如某些框架(如 spring )需要同步返回,或者我们希望主线程阻塞直至操作完成,然后主线程结束、进程退出;但是 listener 只支持纯异步,调用者需要重复编写异步转同步的代码。


为了应对上述场景,我们可以使用 Promise 设计模式来重构异步 API ,以支持多个回调和同步调用。下面对同步 API、异步 listener API、异步 Promise API 的函数形式进行对比,如图3-1所示:


  1. 同步:调用 writeSync() 方法并阻塞;收到响应后函数停止阻塞,并返回响应数据;

  2. 异步 listener:调用 writeAsync() 方法并注册 listener,函数立刻返回;收到响应后,在其他线程触发所注册的 listener;

  3. 异步 Promise:调用 writeAsync(),但不需要在函数中注册 listener,函数立刻返回Promise对象。调用者可以调用异步的 Promise.await(listener),注册任意数目的listener,收到响应后会按顺序触发;此外,也可以调用同步的 Promise.await() ,阻塞直至收到响应。


图3-1 API 形式:同步、异步 listener、异步 Promise


综上,Promise API 在保持异步特性的前提下,提供了更高的灵活性。调用者可以自由选择函数是否阻塞,以及注册任意数目的回调。


3.2 

Promise 的特性与实现


上一节介绍了 Promise API 的使用样例,其核心是一个 Promise 对象,支持注册 listener,以及同步获取响应 result;而本节将对 Promise 的功能进行更加详细的定义。注意,本节并不限定 Promise 的某一具体实现(例:jdk CompletableFuture、netty DefaultPromise),只展示共有的、必须具备的特性;缺少这些特性,Promise 将无法完成异步传递响应数据的工作。


3.2.1 功能特性


  • Promise 的基本方法

Promise 的基本功能是传递响应数据,需要支持下列方法,如表3-1所示:


3-1 Promise的方法

方法
含义
await(listener): void 异步await()。注册对响应数据的 listener
允许多次调用,以注册任意数目的 listener
signalAll(result): void 通知响应数据 result ,按顺序触发所注册的 listener
await(): result 同步await()。阻塞,直至获取响应数据再返回


下面以上一小节的数据库访问 API 为例,演示 Promise 的工作流程,如图3-2所示:

  1. 调用者调用 writeAsync() API ,提交数据库访问请求并获取 Promise 对象;然后调用 Promise.await(listener),注册对响应数据的 listener。Promise 对象也可以传递给程序中其他地方,使得关心响应数据的其他代码,各自注册更多 listener。

  2. writeAsync() 内部,创建 Promise 对象并和这次请求关联起来,假设以 requestId 标识。

  3. writeAsync() 底层常驻有限数目的线程,用于发送请求和接收响应。以 netty 为例,当从网络上收到响应据后,其中一个线程得到通知,执行 channelRead() 函数进行处理;函数取出响应数据和对应的 Promise 对象,并调用 Promise.signalAll() 进行通知。注意这里是伪代码,和 netty 中回调函数的实际签名略有区别。


图3-2a 提交数据库访问请求


图3-2b 创建 Promise 对象


图3-2c 通知 Promise 对象


  • Promise 的时序

Promise 的方法需要保证以下时序。此处以“A对B可见”来描述时序,即:如果先执行操作 A(注册 listener)就会产生某种永久效应(永久记录这个 listener),之后再执行操作 B(通知result)就必须考虑到这种效应,执行相应的处理(触发之前记录的 listener)。

  1. await(listener) 对 signalAll(result) 可见:注册若干 listener 后,通知 result 时必须触发每一个listener,不允许遗漏。

  2. signalAll(result) 对 await(listener) 可见:通知 result 后,再注册 listener 就会立刻触发。

  3. 首次 signalAll(result) 对后续 signalAll(result) 可见。首次通知 result 后,result 即唯一确定,永不改变。之后再通知 result 就会忽略,不产生任何副作用。请求超时是该特性一种典型应用:在提交请求的同时创建一个定时任务;如果能在超时时长内正确收到响应数据,则通知 Promise 正常结束;否则定时任务超时,通知 Promise 异常结束。不论上述事件哪个先发生,都保证只采纳首次通知,使得请求结果唯一确定。


此外,某次 await(listener) 最好对后续 await(listener) 可见,以保证 listener 严格按照注册顺序来触发。


  • Promise的非线程安全实现

如不考虑线程安全,那么下列代码清单可以实现 Promise 的基本特性;线程安全的实现见下一小节。代码清单依次展示了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个注意事项:

  1. 字段 listeners 存储 await(listener) 所注册的 listener 。字段类型为 LinkedList,以存储任意数目的 listener,同时维护 listener 的触发顺序。

  2. 字段isSignaled记录是否通知过 result。如果 isSignaled=true,则后续调用await(listener)时立刻触发 listener,且后续调用 signalAll(result) 时直接忽略。此外,我们以 isSignaled=true 而不是 result=null 来判断是否通知过 result ,因为某些情况下null本身也可以作为响应数据。例如,我们以 Promise<Exception>表示数据库写入的结果,通知 null 表示写入成功,通知 Exception 对象(或某一子类)表示失败原因。

  3. signalAll(T result) 在末尾处调用 listeners.clear() 以释放内存,因为 listeners 已经触发过,不再需要在内存中存储。


public class Promise<T> {
private boolean isSignaled = false; private T result;
private final List<Consumer<T>> listeners = new LinkedList<>();
public void await(Consumer<T> listener) { if (isSignaled) { listener.accept(result); return; }
listeners.add(listener); }
public void signalAll(T result) { if (isSignaled) { return; }
this.result = result; isSignaled = true; for (Consumer<T> listener : listeners) { listener.accept(result); } listeners.clear(); }
public T await() {        // 适当阻塞,直至signalAll()被调用;实际实现见3.2.2节 return result; }}


3.2.2 线程安全特性


上一章3.2.1节讲解了 Promise 的功能,并提供了非线程安全的实现。本节展示如何使用并发工具,实现线程安全的 Promise,如下所示。有下列几个注意事项:

  1. 线程安全。各个字段均被多个线程访问,因此都属于临界区,需要使用适当的线程安全工具进行上锁,如 synchronized、Lock 。一种最简单的实现,是将全部代码纳入临界区内,进入方法时上锁,离开方法时放锁。注意在使用 return 进行提前返回时,不要忘记放锁。

  2. 在临界区外触发 listener,以减少在临界区内停留的时长,并减少潜在的死锁风险。

  3. 同步 await() 。可以使用任何一种同步等待的工具来实现,如 CountDownLatch、Condition。此处使用 Condition 实现,注意根据 java 语法,操作 Condition 时必须先获取 Condition 所关联的锁。


public class Promise<T> {
private final ReentrantLock lock = new ReentrantLock(); private final Condition resultCondition = lock.newCondition();
private boolean isSignaled = false; private T result;
private final List<Consumer<T>> listeners = new LinkedList<>();
public void await(Consumer<T> listener) { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 listener.accept(result); // 在临界区外触发listener return; }
listeners.add(listener); lock.unlock(); }
public void signalAll(T result) { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 return; }
this.result = result; isSignaled = true;
// this.listeners的副本 List<Consumer<T>> listeners = new ArrayList<>(this.listeners); this.listeners.clear(); lock.unlock();
for (Consumer<T> listener : listeners) { listener.accept(result); // 在临界区外触发listener }
/* 操作Condition时须上锁*/ lock.lock(); resultCondition.signalAll(); lock.unlock(); }
public T await() { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 return result; }
while (!isSignaled) { resultCondition.awaitUninterruptibly(); } lock.unlock();
return result; }}


上述实现仅做演示使用,仍有较大的改进空间。生产环境的实现原理,读者可以参考 jdk CompletableFutre、netty DefaultPromise 。可以改进的地方包括:

  1. 使用 CAS 设置响应数据。字段 isSignaled、result 可以合并为一个数据对象,然后使用 CAS 进行设值,从而进一步降低阻塞时长。

  2. 触发 listener 的时序。在上述代码中,Promise.signalAll() 会依次触发 listener;在此期间,如果其他线程调用了异步 await(listener),由于 Promise 的响应数据已概括,该线程也会触发 listener。上述过程中,两个线程同时触发 listener,因此没有严格保证触发顺序。作为改进,类似于 netty DefaultPromise,Promise.signalAll() 内部可以设置一个循环,不断触发 listener 直至 listeners 排空,以防期间注册了新的listener;在此期间,新注册的 listener 可以直接加入到 listeners 中,而不是立刻触发。

  3. listener 的移除。在通知响应数据之前,Promise 长期持有 listener 的引用,导致listener 对象无法被 gc 。可以添加 remove(listener) 方法,或者允许仅持有 listener 的弱引用。


3.2.3 须避免的特性


前面的小节展示了 Promise 的特性与实现原理。纯正的 Promise 是异步传递响应数据的工具,其应当只实现必要的数据传递特性,而不应当夹杂请求提交、数据处理等逻辑。接下来我们来看一看,Promise 在实现时应避免哪些特性,以防限制调用者所能做出的决策。

1. 异步 await() 发生阻塞;该规则不仅适用于 Promise,也适用于任何异步 API。异步 API 常用于实时通信等延时敏感的场景,作用是减少线程阻塞,避免推迟后续其他操作。一旦发生阻塞,系统的响应速度和吞吐量就会受到严重冲击。

以连续提交数据库请求为例。如图3-3a 所示,调用者调用了一个异步 API,连续提交3次写入请求,并在所返回的 Promise 上注册回调。

我们考察 writeAsync() 与 await() 如发生阻塞,将会对调用者造成什么影响,如图3-3b所示。提交请求是纯内存操作,线程处于 RUNNABLE 状态;writeAsync() 或 await() 如果发生阻塞,则线程处于 BLOCKED 状态,暂停工作而无法执行后续操作。当发生阻塞时,调用者每提交一个请求就不得不等待一段时间,从而降低了提交请求的频率,进而推迟了服务器对这些请求的响应,使得系统的吞吐量降低、延迟上升。特别地,如果系统采用了多路复用机制,即一个线程可以处理多个网络连接或多个请求,那么线程阻塞将会严重拖慢后续请求的处理,造成比较难排查的故障。


常见的阻塞原因包括:

  1. Thread.sleep()

  2. 向队列提交任务,调用了 BlockingQueue.put() 和 take();应改为非阻塞的 offer() 和 poll()

  3. 向线程池提交任务,ExecutorService.submit(),如果线程池拒绝策略为 CallerRunsPolicy,而任务本身又是耗时的。

  4. 调用了阻塞的函数,包括:InputStream.read()、同步的 Promise.await() 、KafkaProducer.send() 。注意 KafkaProducer.send() 虽然形式上是异步 API,但是在底层缓存满或者无法获取服务器(broker)信息时,send() 方法仍会发生阻塞。

图3-3a 连续提交请求


图3-3b 请求处理时间线


2. 绑定线程池( ExecutorService ),用于执行请求。如图3-4所示,线程池是异步 API 的一种可选模型,但并不是唯一实现。

  1. 线程池模型。为了不阻塞调用者,API 内置了线程池来提交请求、处理响应;调用者可以向线程池连续提交多个请求,但是不需要等待响应。调用者提交一条请求后,线程池中的某条线程就会被独占,等待接收响应并进行处理,但在此之前无法再处理其他请求;完成处理后,该条线程重新变为空闲,可以继续处理后续请求。

  2. 响应式模型。类似地,API 内置了发送和接收线程来提交请求、处理响应,调用者也不需要同步等待。调用者提交一条请求后,发送线程向网络发送请求;完成发送后,线程立刻变为空闲,可以发送后续请求。当收到响应数据时,接收线程得到通知以处理响应;完成处理后,线程立刻变为空闲,可以处理后续响应数据。上述过程中,任何一条线程都不会被某一请求独占,即线程随时都可以处理请求,而不需要等待之前的请求被响应。

综上,如果绑定了线程池,Promise 就实现了对其他模型(如响应式模型)的兼容性。


图3-4 线程时间线:线程池 vs 响应式


3. 在构造方法创建 Promise 对象时,定义如何提交请求。这种方式只能定义如何处理单条请求,而无法实现请求的批量处理。

以数据库访问为例,现代数据库一般支持批量读写,以略微提升单次访问的延迟为代价,换来吞吐量显著提升;如果吞吐量得到提升,那么平均延迟反而会下降。下面的代码片段展示了一个批量请求 API :数据对象 BulkRequest 可以携带多条普通请求 Request ,从而实现批量提交。


/* 提交单条请求*/client.submit(new Request(1));client.submit(new Request(2));client.submit(new Request(3));
/* 提交批量请求*/client.submit(new BulkRequest( new Request(1), new Request(2), new Request(3)));


为了充分利用“批量请求”的特性,调用者需要进行跨越多条请求的“宏观调控”。请求产生后可以先缓存起来;等待一段时间后,取出所缓存的多条请求,组装一个批量请求来一并提交。因此,如下面的代码片段所示,在构造 Promise 时指定如何提交单条请求是没有意义的,这部分代码(client.submit(new Request(...)))并不会被执行;而实际希望执行的代码,其实是提交批量请求(client.submit(new BulkRequest(...)))。


/* Promise:提交单条请求*/new Promise<>(() -> client.submit(new Request(1)));new Promise<>(() -> client.submit(new Request(2)));new Promise<>(() -> client.submit(new Request(3)));


4. 在构造方法创建 Promise 对象时,定义如何处理响应数据,而不允许后续对响应数据注册回调。如下面的代码片段所示,在构造 Promise 对象时,注册了对响应数据的处理 process(result);但是除此以外,其他代码也有可能关心响应数据,需要注册回调 process1(result)、process2(result)。如果 Promise 只能在构造时注册唯一回调,那么其他关注者就无法注册所需回调函数,即 Promise API 退化回 listener API。


/* 定义如何处理响应数据*/Promise<String> promise = new Promise<>(result -> process(result));
/* 其他代码也关心响应数据*/promise.await(result -> process1(result));promise.await(result -> process2(result));


综上,Promise 应该是一个纯粹的数据对象,其职责是存储回调函数、存储响应数据;同时做好时序控制,保证触发回调函数无遗漏、保证触发顺序。除此以外,Promise不应该和任何实现策略相耦合,不应该杂糅提交请求、处理响应的逻辑。


4. 总结


本文讲解了异步非阻塞设计模式,并对同步 API、异步listener API、异步 Promise API 进行了对比。相比于其他两种 API,Promise API 具有无可比拟的灵活性,调用者可以自由决定同步返回还是异步返回,并允许对响应数据注册多个回调函数。最后,本文讲解了 Promise 基本功能的实现,并初步实现了线程安全特性。

本系列共2篇文章,本文为第1篇《原理篇》。在下一篇《应用篇》中,我们将看到 Promise设计模式丰富的应用场景,将其和现有工具进行结合或对比,以及对 Promise API 进行进一步变形和封装,提供异常处理、调度策略等特性。



参考文献




[A] 异步非阻塞IO

https://en.wikipedia.org/wiki/Asynchronous_I/O

 

[B] Promise

https://en.wikipedia.org/wiki/Futures_and_promises

 

[C] java线程状态

https://segmentfault.com/a/1190000038392244

 

[D] http异步API样例:apache HttpAsyncClient

https://hc.apache.org/httpcomponents-asyncclient-4.1.x/quickstart.html

 

[E] redis异步API样例:lettuce

https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API

 

[F] mongo DB异步API样例:AsyncMongoClient

https://mongodb.github.io/mongo-java-driver/3.0/driver-async/getting-started/quick-tour/

 

[G] elasticsearch异步API样例:RestHighLevelClient

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-index.html

 

[H] influx DB异步API样例:influxdb-java

https://github.com/influxdata/influxdb-java/blob/master/MANUAL.md

 

[I] jedis vs lettuce

https://redislabs.com/blog/jedis-vs-lettuce-an-exploration/

 

[J] kafka

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

 

[K] KafkaProducer.send()阻塞

https://stackoverflow.com/questions/57140680/kafka-asynchronous-send-not-really-asynchronous

 

[L] netty

https://netty.io/wiki/user-guide-for-4.x.html

-END-

往期推荐


本文分享自微信公众号 - 有道技术团队(youdaotech)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
3
5 收藏
分享
加载中
更多评论
打赏
0 评论
5 收藏
3
分享
返回顶部
顶部