文档章节

Rxjava2 源码解析 (五)

街角的小丑
 街角的小丑
发布于 2017/04/26 18:20
字数 967
阅读 71
收藏 0

前言

    本节我们将分析的是Flowable的实现,关于Flowable实际上和Observable非常相似,拥有几乎相同的操作符。不同点在于Flowable实现了背压,而Observable没有实现。实际上Flowable是对于Reactive Streams的一个实现。关于Reactive Streams,可以参考我的这篇博客

Flowable

    首先还是和observable一样,先来看just的类图

    虽然使用的类可能不同,但是我们发现,和Observable的继承机构基本相同。这是有历史原因的,Rxjava第一版同样是参照Reactive Streams项目做的,只是没有继承。第二版改版的目的很大程度上就是为了兼容Reactive Streams的其他实现者。

    首先来看just方法

public static <T> Flowable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new FlowableJust<T>(item));
    }

    内容非常简答,封装一个FlowableJust返回。

public final class FlowableJust<T> extends Flowable<T> implements ScalarCallable<T> {
    private final T value;
    public FlowableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Subscriber<? super T> s) {
        s.onSubscribe(new ScalarSubscription<T>(s, value));
    }

    @Override
    public T call() {
        return value;
    }
}

    先不管原因,总之在调用subscribe之后最终会调用到subscribeActual(和observeable设计一样)。

    内容出奇的简单,直接调用了subscriber的onSubscribe方法。传入一个ScalarSubscribtion。Flowable有一个规则:

    只有当subscribtion的request方法被调用以后才开始调用onNext等方法。

    所以ScalarSubscription中的request方法需要被Subscriber调用,才会开始发送数据。(注意subscriber是观察者,而Subscription类似于Disposable)。

@Override
    public void request(long n) {
        if (!SubscriptionHelper.validate(n)) {
            return;
        }
        if (compareAndSet(NO_REQUEST, REQUESTED)) {
            Subscriber<? super T> s = subscriber;

            s.onNext(value);
            if (get() != CANCELLED) {
                s.onComplete();
            }
        }

    }

    那么内容就很简单了,注意n必须为正数。并且ScalarSubscription会忽略n的值(应为just(T),只会发送一个值!)

    在继续深入之前,我们可以回过头看一下subscribe方法,我们发现它有非常多的重载,首先来看下最基础的版本

public final void subscribe(Subscriber<? super T> s) {
        if (s instanceof FlowableSubscriber) {
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            subscribe(new StrictSubscriber<T>(s));
        }
    }

    介绍这个主要是为了引FlowableSubscriber和StrictSubscriber。不过代码后,实际上StrictSubscribe继承与FlowableSubscriber,  但是它相比FlowableSubscriber,拥有更严格的规则

§1.3: onNext should not be called concurrently until onSubscribe returns</li>
§2.3: onError or onComplete must not call cancel</li>
§3.9: negative requests should emit an onError(IllegalArgumentException)</li>

PS:这是Reactive Streams的规则。

    其他的subscribe方法不介绍也罢,看一下代码马上就知道作用了。

    以上分析算是一个插曲,我们接着来看一个更能体现backpress的例子,just("1","2")发射多个数据。首先来看,所有just发送多个数据的最终都会调用fromArray方法

public static <T> Flowable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        }
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new FlowableFromArray<T>(items));
    }

    封装成一个FlowableFromArray对象

@Override
    public void subscribeActual(Subscriber<? super T> s) {
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<T>(
                    (ConditionalSubscriber<? super T>)s, array));
        } else {
            s.onSubscribe(new ArraySubscription<T>(s, array));
        }
    }

    不管是ArrayConditionalSubscription 或者 ArraySubscription都是继承与BaseArraySubscrition的,来看一下它的request

public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                if (BackpressureHelper.add(this, n) == 0L) {
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    } else {
                        slowPath(n);
                    }
                }
            }
        }

    简单来说,如果第一次调用(返回值为0)或者之前的数据已经发送完。,如果设置为max_value调用fastpath,否则调用slowpath。这两个方法看一下实例的实现就知道,fastPath是依次发送所有的数据,而slowPath是发送指定n个数据。

    ArrayConditionalSubscription和ArraySubscription的区别体现在slowpath上,如果调用tryOnNext方法返回false,表示前台不接受这个数据,那么该数据相当于没有发送,发送个数的计数不会++。

Create onBackpressureXXX subscribeOn observeOn

    流程一样……因为工作原因,挤下去就不分析了,其中线程调度和observable是一样的。create和onbackpressureXXX只是可以选择当背压发生时的处理方法

    

© 著作权归作者所有

共有 人打赏支持
街角的小丑
粉丝 5
博文 107
码字总数 196869
作品 0
杭州
私信 提问
0章 RxJava2课程目录介绍

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_30379689/article/details/83389064 导言 RxJava在工作中和面试中已经成为各个项目开发的必选框架,掌握这...

Hensen_
2018/10/26
0
0
android沉浸状态栏实现、地图多线路规划、Retrofit+RxJava+Jsoup+Mvp模式资讯类App等源码

Android精选源码 Android各种沉浸式状态栏实现源码(http://www.apkbus.com/thread-598927-1-1.html) Android自定义View实现炫酷的星期日期选择控件源码(http://www.apkbus.com/thread-59893...

逆鳞龙
2018/05/08
0
0
深入RxJava2 源码解析(二)

本文作者JasonChen,原文地址: http://chblog.me/2018/12/19/rxjava2%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90(%E4%B8%80)/ 前一篇文章我们讲述到RxJava2 的内部设计模式与原理机制,包括观...

aoho
昨天
0
0
深入RxJava2 源码解析(一)

本文作者JasonChen,原文地址: http://chblog.me/2018/12/19/rxjava2%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90(%E4%B8%80)/ ReactiveX 响应式编程库,这是一个程序库,通过使用可观察的事件...

aoho
01/13
0
0
4章 RxJava基本响应类型

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_30379689/article/details/84875244 CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net...

Hensen_
2018/12/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

vue 对对象的属性进行修改时,不能渲染页面 vue.$set()

我在vue里的方法里给一个对象添加某个属性时,我console.log出来的是已经更改的object ,但是页面始终没有变化 原因如下: **受现代 JavaScript 的限制 (而且 Object.observe 也已经被废弃),...

Js_Mei
今天
1
0
开始看《Java学习笔记》

虽然书买了很久,但一直没看。这其中也写过一些Java程序,但都是基于IDE的帮助和对C#的理解来写的,感觉不踏实。 林信良的书写得蛮好的,能够帮助打好基础,看得出作者是比较用心的。 第1章概...

max佩恩
昨天
12
0
Redux 三大原则

1.单一数据源 在传统的MVC架构中,我们可以根据需要创建无数个Model,而Model之间可以互相监听、触发事件甚至循环或嵌套触发事件,这些在Redux中都是不被允许的。 因为在Redux的思想里,一个...

wenxingjun
昨天
8
0
跟我学Spring Cloud(Finchley版)-12-微服务容错三板斧

至此,我们已实现服务发现、负载均衡,同时,使用Feign也实现了良好的远程调用——我们的代码是可读、可维护的。理论上,我们现在已经能构建一个不错的分布式应用了,但微服务之间是通过网络...

周立_ITMuch
昨天
5
0
XML

学习目标  能够说出XML的作用  能够编写XML文档声明  能够编写符合语法的XML  能够通过DTD约束编写XML文档  能够通过Schema约束编写XML文档  能够通过Dom4j解析XML文档 第1章 xm...

stars永恒
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部