文档章节

Rxjava2 源码解析 (五)

街角的小丑
 街角的小丑
发布于 2017/04/26 18:20
字数 967
阅读 58
收藏 0

前言

    本节我们将分析的是Flowable的实现,关于Flowable实际上和Observable非常相似,拥有几乎相同的操作符。不同点在于Flowable实现了背压,而Observable没有实现。实际上Flowable是对于Reactive Streams的一个实现。关于Reactive Streams,可以参考我的这篇博客

Flowable

    首先还是和observable一样,先来看just的类图

    虽然使用的类可能不同,但是我们发现,和Observable的继承机构基本相同。这是有历史原因的,Rxjava第一版同样是参照Reactive Streams项目做的,只是没有继承。第二版改版的目的很大程度上就是为了兼容Reactive Streams的其他实现者。

    首先来看just方法

public static <T> Flowable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new FlowableJust<T>(item));
    }

    内容非常简答,封装一个FlowableJust返回。

public final class FlowableJust<T> extends Flowable<T> implements ScalarCallable<T> {
    private final T value;
    public FlowableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Subscriber<? super T> s) {
        s.onSubscribe(new ScalarSubscription<T>(s, value));
    }

    @Override
    public T call() {
        return value;
    }
}

    先不管原因,总之在调用subscribe之后最终会调用到subscribeActual(和observeable设计一样)。

    内容出奇的简单,直接调用了subscriber的onSubscribe方法。传入一个ScalarSubscribtion。Flowable有一个规则:

    只有当subscribtion的request方法被调用以后才开始调用onNext等方法。

    所以ScalarSubscription中的request方法需要被Subscriber调用,才会开始发送数据。(注意subscriber是观察者,而Subscription类似于Disposable)。

@Override
    public void request(long n) {
        if (!SubscriptionHelper.validate(n)) {
            return;
        }
        if (compareAndSet(NO_REQUEST, REQUESTED)) {
            Subscriber<? super T> s = subscriber;

            s.onNext(value);
            if (get() != CANCELLED) {
                s.onComplete();
            }
        }

    }

    那么内容就很简单了,注意n必须为正数。并且ScalarSubscription会忽略n的值(应为just(T),只会发送一个值!)

    在继续深入之前,我们可以回过头看一下subscribe方法,我们发现它有非常多的重载,首先来看下最基础的版本

public final void subscribe(Subscriber<? super T> s) {
        if (s instanceof FlowableSubscriber) {
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            subscribe(new StrictSubscriber<T>(s));
        }
    }

    介绍这个主要是为了引FlowableSubscriber和StrictSubscriber。不过代码后,实际上StrictSubscribe继承与FlowableSubscriber,  但是它相比FlowableSubscriber,拥有更严格的规则

§1.3: onNext should not be called concurrently until onSubscribe returns</li>
§2.3: onError or onComplete must not call cancel</li>
§3.9: negative requests should emit an onError(IllegalArgumentException)</li>

PS:这是Reactive Streams的规则。

    其他的subscribe方法不介绍也罢,看一下代码马上就知道作用了。

    以上分析算是一个插曲,我们接着来看一个更能体现backpress的例子,just("1","2")发射多个数据。首先来看,所有just发送多个数据的最终都会调用fromArray方法

public static <T> Flowable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        }
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new FlowableFromArray<T>(items));
    }

    封装成一个FlowableFromArray对象

@Override
    public void subscribeActual(Subscriber<? super T> s) {
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<T>(
                    (ConditionalSubscriber<? super T>)s, array));
        } else {
            s.onSubscribe(new ArraySubscription<T>(s, array));
        }
    }

    不管是ArrayConditionalSubscription 或者 ArraySubscription都是继承与BaseArraySubscrition的,来看一下它的request

public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                if (BackpressureHelper.add(this, n) == 0L) {
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    } else {
                        slowPath(n);
                    }
                }
            }
        }

    简单来说,如果第一次调用(返回值为0)或者之前的数据已经发送完。,如果设置为max_value调用fastpath,否则调用slowpath。这两个方法看一下实例的实现就知道,fastPath是依次发送所有的数据,而slowPath是发送指定n个数据。

    ArrayConditionalSubscription和ArraySubscription的区别体现在slowpath上,如果调用tryOnNext方法返回false,表示前台不接受这个数据,那么该数据相当于没有发送,发送个数的计数不会++。

Create onBackpressureXXX subscribeOn observeOn

    流程一样……因为工作原因,挤下去就不分析了,其中线程调度和observable是一样的。create和onbackpressureXXX只是可以选择当背压发生时的处理方法

    

© 著作权归作者所有

共有 人打赏支持
街角的小丑
粉丝 1
博文 92
码字总数 149517
作品 0
杭州
android沉浸状态栏实现、地图多线路规划、Retrofit+RxJava+Jsoup+Mvp模式资讯类App等源码

Android精选源码 Android各种沉浸式状态栏实现源码(http://www.apkbus.com/thread-598927-1-1.html) Android自定义View实现炫酷的星期日期选择控件源码(http://www.apkbus.com/thread-59893...

逆鳞龙
05/08
0
0
基于RxJava2实现的简单图片爬虫

今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片。刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序。它...

fengzhizi715
2017/10/28
0
0
基于RxJava2实现的简单图片爬虫

今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片。刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序。它...

Tony沈哲
2017/10/28
0
0
Vert.x 3.5.0 发布,基于 JVM 的 Node 替代者

Vert.x 3.5.0 正式版已发布。Vert.x 是一个用于下一代异步、可伸缩、并发应用的框架,旨在为JVM提供一个Node.js的替代方案。开发者可以通过它使用JavaScript、Ruby、Groovy、Java、甚至是混合...

王练
2017/10/21
1K
4
Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

RxJava2+Retrofit+RxBinding解锁各种新姿势 本文已授权微信公众号:鸿洋(hongyangAndroid)原创首发。 本篇文章内容包含以下内容 前言 RxJava2的基本介绍 RxJava2观察者模式的介绍 RxJava2观...

qq_30379689
2017/04/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

49.Nginx防盗链 访问控制 解析php相关 代理服务器

12.13 Nginx防盗链 12.14 Nginx访问控制 12.15 Nginx解析php相关配置(502的问题) 12.16 Nginx代理 扩展 502问题汇总 http://ask.apelearn.com/question/9109 location优先级 http://blog....

王鑫linux
55分钟前
0
0
Nginx防盗链、访问控制、解析php相关配置、Nginx代理

一、Nginx防盗链 1. 编辑虚拟主机配置文件 vim /usr/local/nginx/conf/vhost/test.com.conf 2. 在配置文件中添加如下的内容 { expires 7d; valid_referers none blocked server_names *.tes......

芬野de博客
今天
0
0
spring EL 和资源调用

资源调用 import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.PropertySource;import org.springframework.core.io.Resource;......

Canaan_
今天
1
0
memcached命令行、memcached数据导出和导入

一、memcached命令行 yum装telnet yum install telent 进入memcached telnet 127.0.0.1 11211 命令最后的2表示,两位字节,30表示过期时间(秒) 查看key1 get key1 删除:ctrl+删除键 二、m...

Zhouliang6
今天
0
0
Linux定时备份MySQL数据库

做项目有时候要备份数据库,手动备份太麻烦,所以找了一下定时备份数据库的方法 Linux里有一个 crontab 命令被用来提交和管理用户的需要周期性执行的任务,就像Windows里的定时任务一样,用这...

月夜中徘徊
今天
1
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部