文档章节

什么是Reactive Streams in Java 译

woshixin
 woshixin
发布于 2018/10/10 16:47
字数 3843
阅读 24
收藏 0

函数式编程对于Reactive Programming很重要,但我不会在这篇文章中深入探讨函数式编程。

在这篇文章中,我想看看Java中的整体Reactive发展环境。

Reactive Programming vs. Reactive Streams

有了这些新的流行语,就很容易对它们的含义感到困惑。

反应式编程是一种编程范式,但我不会称之为新的。它实际上已经存在了一段时间。

就像面向对象编程,函数式编程或过程式编程一样,反应式编程只是另一种编程范式。

另一方面,Reactive Streams是一种规范。对于Java程序员,Reactive Streams是一个API。Reactive Streams为我们提供了Java中的Reactive Programming的通用API。

Reactive Streams API是Kaazing,Netflix,Pivotal,Red Hat,Twitter,Typesafe等众多工程师之间合作的产物。

Reactive Streams非常类似于JPA或JDBC。两者都是API规范。您需要使用API​​规范的实现。

例如,从JDBC规范中,您具有Java DataSource接口。Oracle JDBC实现将为您提供DataSource接口的实现。正如Microsoft的SQL Server JDBC实现也将提供DataSource接口的实现。

现在,您的高级程序可以接受DataSource对象,并且应该能够使用数据源,而不必担心它是由Oracle提供还是由Microsoft提供。

就像JPA或JDBC一样,Reactive Streams为我们提供了一个我们可以编写代码的API接口,而无需担心底层实现。

Reactive Programming

关于Reactive programming是什么,有很多意见。Reactive programming也有很多炒作!

开始学习Reactive programming范例的最佳起点是阅读“Reactive Manifesto”。

Reactive Manifesto

Reactive Manifesto描述了reactive systems的四个关键属性:

Responsive

如果可能的话,系统及时响应。响应性是可用性和实用性的基石,但更重要的是,响应性意味着可以快速检测到问题并有效地处理问题。reactive systems专注于提供快速一致的响应时间,建立可靠的上限,以便提供一致的服务质量。这种一致的行为反过来简化了错误处理,建立了最终用户的信心,并鼓励进一步的交互。

Resilient

系统在出现故障时保持响应。这不仅适用于高可用性,关键任务系统 - 任何无弹性的系统在发生故障后都不会响应。通过复制,遏制,隔离和委派来实现弹性。故障包含在每个组件中,将组件彼此隔离,从而确保系统的各个部分可以在不损害整个系统的情况下发生故障和恢复。将每个组件的恢复委派给另一个(外部)组件,并在必要时通过复制确保高可用性。组件的客户端不会负担处理其故障的负担。

Elastic

系统在不同的工作负载下保持响应。反应系统可以通过增加或减少分配用于服务这些输入的资源来对输入速率的变化作出反应。这意味着设计没有争用点或中心瓶颈,从而能够分片或复制组件并在它们之间分配输入。Reactive Systems通过提供相关的实时性能测量来支持预测和反应式扩展算法。它们在商用硬件和软件平台上以经济高效的方式实现弹性。

Message Driven

Reactive Systems依靠异步消息传递在组件之间建立边界,以确保松散耦合,隔离和位置透明性。此边界还提供将故障委派为消息的方法。使用显式消息传递可以通过整形和监视系统中的消息队列并在必要时应用反压来实现负载管理,弹性和流量控制。作为通信手段的位置透明消息传递使得管理失败可以在群集内或单个主机内使用相同的构造和语义。非阻塞通信允许接收者仅在活动时消耗资源,从而减少系统开销。

前三个属性(Responsive, Resilient, Elastic)与您的架构选择更相关。很容易理解为什么微服务,Docker和Kubernetes等技术是Reactive系统的重要方面。在单个服务器上运行LAMP堆栈显然不符合Reactive Manifesto的目标。

Message Driven and Reactive Programming

作为Java开发人员,它是我们最感兴趣的最后一个属性,即Message Driven属性。

消息驱动的架构当然不是革命性的。如果你需要一个关于消息驱动系统的入门读物,我想建议阅读Enterprise Integration Patterns - 一本真正标志性的计算机科学书籍。本书中的概念为Spring Integration和Apache Camel奠定了基础。

我们对Java开发人员感兴趣的Reactive Manifesto的几个方面是:消息失败,背压和非阻塞。这些是Java中反应式编程的微妙但重要的方面。

Failures as Messages

通常在Reactive编程中,您将处理消息流。

不希望的是抛出异常并结束消息流的处理。首选方法是优雅地处理故障。

也许你需要执行一个Web服务并且它已经关闭了。也许你可以使用备份服务?或者可能在10ms内重试?

我不会在这里解决每一个边缘案例。关键的一点是,您不希望因运行时异常而大声失败。理想情况下,您需要记录失败,并具有某种类型的重试或恢复逻辑。通常,故障是通过回调来处理的。

JavaScript开发人员习惯于使用回调。但回调可能会变得难以使用。JavaScript开发人员将此称为回调地狱。

在Reactive Steams中,异常是一等公民。不是粗暴抛出的。错误处理内置于Reactive Streams API规范中。

Back Pressure

你听说过“从消防中喝水”这句话吗?

背压是反应式编程中非常重要的概念。它为下游客户提供了一种说法,“我还有更多,请。”

想象一下,如果您正在查询数据库,结果集将返回1000万行。传统上,数据库将以客户端接受它们的速度吐出所有1000万行。

当客户端不能再接受时,它会阻塞。数据库焦急地等待着。阻止。链中的线程耐心地等待解除阻塞。

在Reactive世界中,我们希望我们的客户能够说给我前1000个。然后我们可以给他们1000并继续我们的业务 - 直到客户回来并要求另一组记录。

这与客户没有发言权的传统系统形成鲜明对比。通过阻塞线程来完成限制,而不是以编程方式。

Non-Blocking

对Java开发人员来说重要的Reactive架构的最后,也许是最重要的方面是非阻塞的。

直到Reactive来了很长时间,没有阻塞似乎并没有那么重要。

作为Java开发人员,我们已经被教会通过使用线程来利用强大的现代硬件。越来越多的核心意味着我们可以使用越来越多的线程。因此,如果我们需要等待数据库或Web服务返回,则不同的线程可以利用CPU。这似乎对我们有意义。当我们被阻塞的线程等待某种类型的I / O时,不同的线程可以使用CPU。

因此,阻塞并不是什么大问题。对?

系统中的每个线程都将消耗资源。每次线程被阻塞时,都会消耗资源。虽然CPU在维护不同线程方面非常有效,但仍然存在成本问题。我们Java开发人员可能是一群傲慢的人。

他们总是看不起JavaScript,有点讨厌的小语言。事实上,JavaScript共享“Java”这个词总是让我们Java程序员感觉有点脏。如果您是Java开发人员,当您必须指出Java和JavaScript是两种不同的语言时,您有多少次感到烦恼?然后Node.js出现了。Node.js在吞吐量方面提出了疯狂的基准测试。然后Java社区注意到了。是的,脚本小子已经长大,正在侵占我们的地盘。并不是在谷歌的V8 JavaScript引擎中运行的JavaScript对于编程来说是一个非常快速的天赐之物。Java曾经在性能方面有其瑕疵,但与现代本地语言相比,它非常高效。

Node.js表现的秘诀是无阻塞。

Node.js使用具有有限数量线程的事件循环。虽然在Java世界中的阻塞通常被视为没什么大不了的,但在Node.js世界中,它将成为性能的死亡之吻。

这些图形可以帮助您可视化差异。在Node.js中,有一个非阻塞事件循环。请求以非阻塞方式处理。线程不会卡在等待其他进程。

将Node.js模型与Java中使用的典型多线程服务器进行对比。并发是通过使用多个线程实现的。由于多核处理器的增长,这被普遍接受。

我个人认为这两种方法之间的区别在于高速公路和许多带灯光的城市街道之间的区别。

通过单线程事件循环,您的过程可以在超级高速公路上快速巡航。在多线程服务器中,您的进程会停在城市街道上并停止流量。两者都可以带来大量流量。但是,我宁愿在高速公路上巡航!

当您转向非阻塞范例时,您的代码会在CPU上停留更长时间。线程切换较少。您不仅要管理许多线程,还要删除线程之间的上下文切换。

您将在系统容量中看到更多的空间,供您的程序使用。非阻塞不是圣杯,你不会看到事情变得更快。是的,管理阻塞需要付出代价。但考虑到所有事情,它相对有效。事实上,在一个适度使用的系统上,我不确定差异会有多大可衡量。

但是,随着系统负载的增加,您可以期待看到的是,您将拥有额外的容量来为更多请求提供服务。您将获得更高的并发性。多少?好问题。用例非常具体。与所有基准一样,您的里程也会有所不同。

The Reactive Streams API

我们来看看Reactive Streams API for Java。Reactive Streams API仅包含四个接口。

Publisher

发布者是可能无限数量的有序元素的提供者,根据从其订阅者收到的需求发布它们。

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber

在将订阅者实例传递给Publisher.subscribe(订阅者)之后,将接收对Subscriber.onSubscribe(订阅)的调用。
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscription

订阅表示订阅发布者的订阅者的一对一生命周期。

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Processor

Processor代表一个处理阶段 - 既是订阅者又是发布者,并遵守两者的合同。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Reactive Streams Implementations for Java

Java中的Reactive环境正在不断发展和成熟。David Karnok在Advanced Reactive Java上有一篇很棒的博客文章Advanced Reactive Java,他将各种反应项目分解成几代。我会注意到下面每一代(随着新版本的发布,它可能会随时改变)。

RxJava

RxJava是ReactiveX项目中的Java实现。在撰写本文时,ReactiveX项目实现了Java,JavaScript,.NET(C#),Scala,Clojure,C ++,Ruby,Python,PHP,Swift等等。

ReactiveX在GoF Observer模式上提供了反应性扭曲,这是一种很好的方法。ReactiveX调用他们的方法'Observer Pattern Done Right'

RxJava早于Reactive Streams规范。虽然RxJava 2.0+确实实现了Reactive Streams API规范,但您会注意到术语略有不同。David Karnok是RxJava的关键提交者,他认为RxJava是第三代reactive library。

Reactor

Reactor是Pivotal的Reactive Streams兼容实现。

从Reactor 3.0开始,Java 8或更高版本是必需的。Spring Framework 5中的Reactive功能基于Reactor 3.0构建。Reactor是reactive library。(David Karnok也是项目Reactor的提交者)

Akka Streams

Akka Streams还完全实现了Reactive Streams规范。Akka使用Actors处理流数据。虽然Akka Streams符合Reactive Streams API规范,但Akka Streams API与Reactive Streams接口完全分离。Akka Streams被认为是reactive library。

Ratpack

Ratpack是一组用于构建现代高性能HTTP应用程序的Java库。Ratpack使用Java 8,Netty和Reactive原则。Ratpack提供了Reactive Stream API的基本实现,但并不是一个功能齐全的reactive工具包。(可选)您可以将RxJava或Reactor与Ratpack一起使用。

Vert.x

Vert.x是一个Eclipse Foundation项目。它是JVM的多语言事件驱动的应用程序框架。Vert.x中的反应支持与Ratpack类似。Vert.x允许您使用RxJava或其本机实现的Reactive Streams API。

Reactive Streams and JVM Releases

Reactive Streams for Java 1.8

使用Java 1.8,您将获得对Reactive Streams规范的强大支持。

在Java 1.8中,Reactive stream不是Java API的一部分。但是,它可以作为一个单独的罐子提供。

Reactive Streams Maven Dependency

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.0</version>
</dependency>

虽然您可以直接包含此依赖项,但您正在使用的任何Reactive Streams实现都应将其自动包含为依赖项。

Reactive Streams for Java 1.9

当你转向Java 1.9时,事情会发生一些变化。Reactive Streams已成为官方Java 9 API的一部分。

您会注意到Reactive Streams接口在Java 9中的Flow类下。但除此之外,API与Jav​​a 1.8中的Reactive Streams 1.0相同。

Conclusion

在Java 9中,Reactive Streams正式成为Java API的一部分。在研究这篇文章时,很明显各种反应性库已经在不断发展和成熟(即David Karnok世代分类)。

在Reactive Streams之前,各种reactive库无法实现互操作性。他们无法互相交谈。早期版本的RxJava与项目Reactor的早期版本不兼容。但是在Java 9发布前夕,主要的反应库采用了Reactive Streams规范。现在,不同的库可以互操作。

互操作性是一个重要的多米诺骨牌。例如,MongoDB实现了Reactive Streams驱动程序。

现在,在我们的应用程序中,我们可以使用Reactor或RxJava来使用MongoDB中的数据。我们仍然在适应Reactive Streams的早期阶段。但是在接下来的一年左右的时间里,我们可以期待越来越多的开源项目提供Reactive Streams兼容性。

我希望在不久的将来我们会看到更多的Reactive Streams。这是一个成为Java开发人员的有趣时间!

下节再续!

原文:https://dzone.com/articles/what-are-reactive-streams-in-java

有什么讨论的内容,可以加我公众号:

© 著作权归作者所有

woshixin
粉丝 34
博文 375
码字总数 282669
作品 0
杭州
程序员
私信 提问
Reactive Programming 一种技术,各自表述

前言 作为一名 Java 开发人员,尤其是 Java 服务端工程师,对于 Reactive Programming 的概念似乎相对陌生。随着 Java 9 以及 Spring Framework 5 的相继发布,Reactive 技术逐渐开始被广大从...

小马哥mercyblitz
2018/07/23
0
0
Flow支持Reactive Streams in Java

Java 的 JVM Flow 就是按照 reactive-stream 的 API 规范写的,来看看Flow长的什么样? 可以看到是在神奇 java.util.concurrent 包中,而且这是一个 Final 修饰的类。 java.util.concurrent...

woshixin
2018/06/13
42
0
Microserver 0.87 发布,Java 微服务框架

Microserver 0.87 发布了,该版本主要改进如下: Use Java 8 Streams (or reactive-streams Publishers) to Stream data between Microservices. Standardized error codes, error handling ......

淡漠悠然
2016/07/08
1.8K
2
java9之Reactive Streams

Java 9 Reactive Streams允许我们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。   如果您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Stre...

李玉长
2019/04/11
194
0
RxJava 2.2.10 发布,Rx 的 Java 实现

RxJava 2.2.10 发布了,RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。 主要更新内容如下: Bug 修复 Pull 6499:将缺少的空检查添加到 BufferExactBounded...

xplanet
2019/06/22
2.1K
1

没有更多内容

加载失败,请刷新页面

加载更多

如何为MVC-3转换为4应用程序添加对System.Web.Optimization的引用

我正在最近从MVC 3转换为MVC 4 beta的项目中尝试使用新的捆绑功能。 它需要global.asax中的一行代码, BundleTable.Bundles.RegisterTemplateBundles(); ,这需要using System.Web.Optimiza...

技术盛宴
今天
79
0
Kettle自定义jar包供javascript使用

我们都知道 Kettle 是用 Java 语言开发,并且可以在 JavaScript 里面直接调用 java 类方法。所以有些时候,我们可以自定义一些方法,来供 JavaScript 使用。 本篇文章有参考自:https://www...

CREATE_17
昨天
114
0
处理CSV文件中的逗号

我正在寻找有关如何处理正在创建的csv文件的建议,然后由我们的客户上传,并且该值可能带有逗号(例如公司名称)。 我们正在研究的一些想法是:带引号的标识符(值“,”值“,”等)或使用|...

javail
昨天
79
0
计算一个数的数位之和

计算一个数的数位之和 例如:128 :1+2+8 = 11 public int numSum(int num) { int sum = 0; do { sum += num % 10; } while ((num = num / 10) > 0); return sum;......

SongAlone
昨天
128
0
为什么图片反复压缩后普遍会变绿,而不是其他颜色?

作者:Lion Yang 链接:https://www.zhihu.com/question/29355920/answer/119088684 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 业余版概要:安卓的...

shzwork
昨天
85
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部