文档章节

reactive streams与观察者模式

go4it
 go4it
发布于 2018/01/11 22:33
字数 748
阅读 46
收藏 0

本文主要研究下java里头的reactive streams与观察者模式。

reactive streams

reactive编程范式是一个异步编程范式,主要涉及数据流及变化的传播,可以看做是观察者设计模式的扩展。

java里头的iterator是以pull模型,即订阅者使用next去拉取下一个数据;而reactive streams则是以push模型为主,订阅者调用subscribe方法订阅,发布者调用订阅者的onNext通知订阅者新消息。

reactive streams java api

reactive streams定义了4个java api,如下

Processor<T,R>

processor既是Subscriber也是Publisher,代表二者的处理阶段

Publisher<T>

publisher是数据的提供者, 将数据发布给订阅者

Subscriber<T>

在调用Publisher.subscribe(Subscriber)之后,Subscriber.onSubscribe(Subscription)将会被调用

Subscription

Subscription代表订阅者与发布者的一次订阅周期,一旦调用cancel去掉订阅,则发布者不会再推送消息。

观察者模式

观察者模式的实现有推模型和拉模型

  • 拉模型

即发布者通知订阅有新消息,订阅者再去找发布者拉取

  • 推模型

即发布者通知订阅者有消息,通知的时候已经带上了一个新消息

reactor实例

maven

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.1.2.RELEASE</version>
		</dependency>

reactor 3 是java里头reactive streams的一个实现,基于reactive streams的java api,是spring 5反应式编程的基础。

Flux实例

    @Test
    public void testBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

小结

从上面的代码看,reactive streams实际上是推拉结合的模式的结合。为什么还要拉呢?

rabbitmq vs kafka

rabbitmq是以推为主的,如果消费者消费能力跟不上,则消息会堆积在内存队列中(必要时可能写磁盘)

kafka则是以拉为主的,生产者推送消息到broker,消费者自己根据自己的能力从broker拉取消息,由于消息是持久化的,因此无需关心生产消费速率的不平衡

backpressure

backpressure这个是为处理生产速率与消费速率不平衡这个问题而衍生出来的,订阅者可以在next方法里头根据自己的情况,使用request方法告诉发布者要取N个数据,发布者则向订阅者推送N个数据。通过request达到订阅者对发布者的反馈。而对于发布者而言,为了实现backpressure,则需要有一个缓存队列来缓冲订阅者没来得及消费的数据。涉及到缓冲,就涉及容量是有界还是无界,如果是有界则在缓冲慢的时候,处理策略是怎样等等。

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1223
码字总数 1140450
作品 0
深圳
私信 提问
Reactive Programming 一种技术,各自表述

前言 作为一名 Java 开发人员,尤其是 Java 服务端工程师,对于 Reactive Programming 的概念似乎相对陌生。随着 Java 9 以及 Spring Framework 5 的相继发布,Reactive 技术逐渐开始被广大从...

小马哥mercyblitz
2018/07/23
0
0
《Akka应用模式:分布式应用程序设计实践指南》读书笔记2

Akka简介   Akka是什么:“Akka是在JVM上构建高并发、分布式、弹性消息驱动应用的开源工具包”。弹性意味着要积极响应失败情况,从失败中恢复的能力。   其实Akka的定义很符合响应式领域...

gabrywu
2018/06/05
0
0
设计模式笔记(三)——建造者模式

需求矛盾 l 同一个厨师,可能每次炒的菜味道不一样! l 但全国各地的肯德基,味道都几乎一个味??? 思考: 1) 为什么我每次吃到的菜,味道都不一样?到底菜被什么控制了? 2) 为什么肯德基...

LinkedBear
2018/03/28
41
0
Spring Reactor介绍

Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。 目前reactor是作为Spring.io核心包下面项目。 Reactor 是一个基础性库包 –定位在用户级...

大糊涂
2015/06/20
635
0
【设计模式笔记】(十六)- 代理模式

一、简述 代理模式(Proxy Pattern),为其他对象提供一个代理,并由代理对象控制原有对象的引用;也称为委托模式。 其实代理模式无论是在日常开发还是设计模式中,基本随处可见,中介者模式中...

MrTrying
2018/06/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何为MVC-3转换为4应用程序添加对System.Web.Optimization的引用

我正在最近从MVC 3转换为MVC 4 beta的项目中尝试使用新的捆绑功能。 它需要global.asax中的一行代码, BundleTable.Bundles.RegisterTemplateBundles(); ,这需要using System.Web.Optimiza...

技术盛宴
今天
79
0
Kettle自定义jar包供javascript使用

我们都知道 Kettle 是用 Java 语言开发,并且可以在 JavaScript 里面直接调用 java 类方法。所以有些时候,我们可以自定义一些方法,来供 JavaScript 使用。 本篇文章有参考自:https://www...

CREATE_17
昨天
114
0
处理CSV文件中的逗号

我正在寻找有关如何处理正在创建的csv文件的建议,然后由我们的客户上传,并且该值可能带有逗号(例如公司名称)。 我们正在研究的一些想法是:带引号的标识符(值“,”值“,”等)或使用|...

javail
昨天
79
0
计算一个数的数位之和

计算一个数的数位之和 例如:128 :1+2+8 = 11 public int numSum(int num) { int sum = 0; do { sum += num % 10; } while ((num = num / 10) > 0); return sum;......

SongAlone
昨天
128
0
为什么图片反复压缩后普遍会变绿,而不是其他颜色?

作者:Lion Yang 链接:https://www.zhihu.com/question/29355920/answer/119088684 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 业余版概要:安卓的...

shzwork
昨天
85
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部