文档章节

Rxjava2 源码解析 (五)

街角的小丑
 街角的小丑
发布于 2017/04/26 18:20
字数 967
阅读 67
收藏 0

前言

    本节我们将分析的是Flowable的实现,关于Flowable实际上和Observable非常相似,拥有几乎相同的操作符。不同点在于Flowable实现了背压,而Observable没有实现。实际上Flowable是对于Reactive Streams的一个实现。关于Reactive Streams,可以参考我的这篇博客

Flowable

    首先还是和observable一样,先来看just的类图

    虽然使用的类可能不同,但是我们发现,和Observable的继承机构基本相同。这是有历史原因的,Rxjava第一版同样是参照Reactive Streams项目做的,只是没有继承。第二版改版的目的很大程度上就是为了兼容Reactive Streams的其他实现者。

    首先来看just方法

public static <T> Flowable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new FlowableJust<T>(item));
    }

    内容非常简答,封装一个FlowableJust返回。

public final class FlowableJust<T> extends Flowable<T> implements ScalarCallable<T> {
    private final T value;
    public FlowableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Subscriber<? super T> s) {
        s.onSubscribe(new ScalarSubscription<T>(s, value));
    }

    @Override
    public T call() {
        return value;
    }
}

    先不管原因,总之在调用subscribe之后最终会调用到subscribeActual(和observeable设计一样)。

    内容出奇的简单,直接调用了subscriber的onSubscribe方法。传入一个ScalarSubscribtion。Flowable有一个规则:

    只有当subscribtion的request方法被调用以后才开始调用onNext等方法。

    所以ScalarSubscription中的request方法需要被Subscriber调用,才会开始发送数据。(注意subscriber是观察者,而Subscription类似于Disposable)。

@Override
    public void request(long n) {
        if (!SubscriptionHelper.validate(n)) {
            return;
        }
        if (compareAndSet(NO_REQUEST, REQUESTED)) {
            Subscriber<? super T> s = subscriber;

            s.onNext(value);
            if (get() != CANCELLED) {
                s.onComplete();
            }
        }

    }

    那么内容就很简单了,注意n必须为正数。并且ScalarSubscription会忽略n的值(应为just(T),只会发送一个值!)

    在继续深入之前,我们可以回过头看一下subscribe方法,我们发现它有非常多的重载,首先来看下最基础的版本

public final void subscribe(Subscriber<? super T> s) {
        if (s instanceof FlowableSubscriber) {
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            subscribe(new StrictSubscriber<T>(s));
        }
    }

    介绍这个主要是为了引FlowableSubscriber和StrictSubscriber。不过代码后,实际上StrictSubscribe继承与FlowableSubscriber,  但是它相比FlowableSubscriber,拥有更严格的规则

§1.3: onNext should not be called concurrently until onSubscribe returns</li>
§2.3: onError or onComplete must not call cancel</li>
§3.9: negative requests should emit an onError(IllegalArgumentException)</li>

PS:这是Reactive Streams的规则。

    其他的subscribe方法不介绍也罢,看一下代码马上就知道作用了。

    以上分析算是一个插曲,我们接着来看一个更能体现backpress的例子,just("1","2")发射多个数据。首先来看,所有just发送多个数据的最终都会调用fromArray方法

public static <T> Flowable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        }
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new FlowableFromArray<T>(items));
    }

    封装成一个FlowableFromArray对象

@Override
    public void subscribeActual(Subscriber<? super T> s) {
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<T>(
                    (ConditionalSubscriber<? super T>)s, array));
        } else {
            s.onSubscribe(new ArraySubscription<T>(s, array));
        }
    }

    不管是ArrayConditionalSubscription 或者 ArraySubscription都是继承与BaseArraySubscrition的,来看一下它的request

public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                if (BackpressureHelper.add(this, n) == 0L) {
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    } else {
                        slowPath(n);
                    }
                }
            }
        }

    简单来说,如果第一次调用(返回值为0)或者之前的数据已经发送完。,如果设置为max_value调用fastpath,否则调用slowpath。这两个方法看一下实例的实现就知道,fastPath是依次发送所有的数据,而slowPath是发送指定n个数据。

    ArrayConditionalSubscription和ArraySubscription的区别体现在slowpath上,如果调用tryOnNext方法返回false,表示前台不接受这个数据,那么该数据相当于没有发送,发送个数的计数不会++。

Create onBackpressureXXX subscribeOn observeOn

    流程一样……因为工作原因,挤下去就不分析了,其中线程调度和observable是一样的。create和onbackpressureXXX只是可以选择当背压发生时的处理方法

    

© 著作权归作者所有

共有 人打赏支持
街角的小丑
粉丝 1
博文 93
码字总数 150904
作品 0
杭州
私信 提问
android沉浸状态栏实现、地图多线路规划、Retrofit+RxJava+Jsoup+Mvp模式资讯类App等源码

Android精选源码 Android各种沉浸式状态栏实现源码(http://www.apkbus.com/thread-598927-1-1.html) Android自定义View实现炫酷的星期日期选择控件源码(http://www.apkbus.com/thread-59893...

逆鳞龙
05/08
0
0
基于RxJava2实现的简单图片爬虫

今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片。刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序。它...

fengzhizi715
2017/10/28
0
0
基于RxJava2实现的简单图片爬虫

今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片。刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序。它...

Tony沈哲
2017/10/28
0
0
Vert.x 3.5.0 发布,基于 JVM 的 Node 替代者

Vert.x 3.5.0 正式版已发布。Vert.x 是一个用于下一代异步、可伸缩、并发应用的框架,旨在为JVM提供一个Node.js的替代方案。开发者可以通过它使用JavaScript、Ruby、Groovy、Java、甚至是混合...

王练
2017/10/21
1K
4
Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

RxJava2+Retrofit+RxBinding解锁各种新姿势 本文已授权微信公众号:鸿洋(hongyangAndroid)原创首发。 本篇文章内容包含以下内容 前言 RxJava2的基本介绍 RxJava2观察者模式的介绍 RxJava2观...

qq_30379689
2017/04/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nuc970 uboot nand-boot,kernel, filesystem 烧录位置

一 烧写到Nand Flash **1.1 **相关文件说明 l BSP版本:nuc970bsp-release-20150519.zip l NuWriter版本:2015/04/28-V01,nuvoTon Nu-Writer V1.0 l 烧写文件: u-boot-spl.bin:负责将u-b......

CookieDemo
57分钟前
1
0
python中sort和sorted函数小结

L.sort(cmp=None, key=None, reverse=False) sorted(iterable, cmp=None, key=None, reverse=False) 这样看,sorted函数只比sort函数多一个iterable参数,其余没什么不同,iterable是一个迭代......

上官夏洛特
今天
3
0
thinkphp 常用SQL执行语句总结

第一条:Db::tablera('vr_panomas')->where(['delete_time'=>0,'id'=>['in',$pids]])->field(['id'=>'id','post_thumb'=>'thumb','post_title'=>'title','post_tags'=>'tags','post_price'=>......

koothon
今天
5
0
支付宝返回状态resultStatus意思

上一篇集成支付宝的时候,会有一些支付宝返回的resultStatus,具体意思是: 9000 订单支付成功 8000 正在处理中 4000 订单支付失败 6001 用户中途取消 6002 网络连接出错 还有memo,意思就是...

RainOrz
今天
3
0
electron webview 页面加载事件顺序

1.did-start-loading 页面开始加载 2.load-commit 主页面文档加载 3.page-title-updated title 4.dom-ready 主页面 dom 加载完成 5.load-commit frame文档加载 6.did-frame-finish-load fram......

dubox
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部