文档章节

Rxjava2 源码解析 (五)

街角的小丑
 街角的小丑
发布于 2017/04/26 18:20
字数 967
阅读 47
收藏 0
点赞 0
评论 0

前言

    本节我们将分析的是Flowable的实现,关于Flowable实际上和Observable非常相似,拥有几乎相同的操作符。不同点在于Flowable实现了背压,而Observable没有实现。实际上Flowable是对于Reactive Streams的一个实现。关于Reactive Streams,可以参考我的这篇博客

Flowable

    首先还是和observable一样,先来看just的类图

    虽然使用的类可能不同,但是我们发现,和Observable的继承机构基本相同。这是有历史原因的,Rxjava第一版同样是参照Reactive Streams项目做的,只是没有继承。第二版改版的目的很大程度上就是为了兼容Reactive Streams的其他实现者。

    首先来看just方法

public static <T> Flowable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new FlowableJust<T>(item));
    }

    内容非常简答,封装一个FlowableJust返回。

public final class FlowableJust<T> extends Flowable<T> implements ScalarCallable<T> {
    private final T value;
    public FlowableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Subscriber<? super T> s) {
        s.onSubscribe(new ScalarSubscription<T>(s, value));
    }

    @Override
    public T call() {
        return value;
    }
}

    先不管原因,总之在调用subscribe之后最终会调用到subscribeActual(和observeable设计一样)。

    内容出奇的简单,直接调用了subscriber的onSubscribe方法。传入一个ScalarSubscribtion。Flowable有一个规则:

    只有当subscribtion的request方法被调用以后才开始调用onNext等方法。

    所以ScalarSubscription中的request方法需要被Subscriber调用,才会开始发送数据。(注意subscriber是观察者,而Subscription类似于Disposable)。

@Override
    public void request(long n) {
        if (!SubscriptionHelper.validate(n)) {
            return;
        }
        if (compareAndSet(NO_REQUEST, REQUESTED)) {
            Subscriber<? super T> s = subscriber;

            s.onNext(value);
            if (get() != CANCELLED) {
                s.onComplete();
            }
        }

    }

    那么内容就很简单了,注意n必须为正数。并且ScalarSubscription会忽略n的值(应为just(T),只会发送一个值!)

    在继续深入之前,我们可以回过头看一下subscribe方法,我们发现它有非常多的重载,首先来看下最基础的版本

public final void subscribe(Subscriber<? super T> s) {
        if (s instanceof FlowableSubscriber) {
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            subscribe(new StrictSubscriber<T>(s));
        }
    }

    介绍这个主要是为了引FlowableSubscriber和StrictSubscriber。不过代码后,实际上StrictSubscribe继承与FlowableSubscriber,  但是它相比FlowableSubscriber,拥有更严格的规则

§1.3: onNext should not be called concurrently until onSubscribe returns</li>
§2.3: onError or onComplete must not call cancel</li>
§3.9: negative requests should emit an onError(IllegalArgumentException)</li>

PS:这是Reactive Streams的规则。

    其他的subscribe方法不介绍也罢,看一下代码马上就知道作用了。

    以上分析算是一个插曲,我们接着来看一个更能体现backpress的例子,just("1","2")发射多个数据。首先来看,所有just发送多个数据的最终都会调用fromArray方法

public static <T> Flowable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        }
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new FlowableFromArray<T>(items));
    }

    封装成一个FlowableFromArray对象

@Override
    public void subscribeActual(Subscriber<? super T> s) {
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<T>(
                    (ConditionalSubscriber<? super T>)s, array));
        } else {
            s.onSubscribe(new ArraySubscription<T>(s, array));
        }
    }

    不管是ArrayConditionalSubscription 或者 ArraySubscription都是继承与BaseArraySubscrition的,来看一下它的request

public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                if (BackpressureHelper.add(this, n) == 0L) {
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    } else {
                        slowPath(n);
                    }
                }
            }
        }

    简单来说,如果第一次调用(返回值为0)或者之前的数据已经发送完。,如果设置为max_value调用fastpath,否则调用slowpath。这两个方法看一下实例的实现就知道,fastPath是依次发送所有的数据,而slowPath是发送指定n个数据。

    ArrayConditionalSubscription和ArraySubscription的区别体现在slowpath上,如果调用tryOnNext方法返回false,表示前台不接受这个数据,那么该数据相当于没有发送,发送个数的计数不会++。

Create onBackpressureXXX subscribeOn observeOn

    流程一样……因为工作原因,挤下去就不分析了,其中线程调度和observable是一样的。create和onbackpressureXXX只是可以选择当背压发生时的处理方法

    

© 著作权归作者所有

共有 人打赏支持
街角的小丑
粉丝 1
博文 86
码字总数 136018
作品 0
杭州
android沉浸状态栏实现、地图多线路规划、Retrofit+RxJava+Jsoup+Mvp模式资讯类App等源码

Android精选源码 Android各种沉浸式状态栏实现源码(http://www.apkbus.com/thread-598927-1-1.html) Android自定义View实现炫酷的星期日期选择控件源码(http://www.apkbus.com/thread-59893...

逆鳞龙 ⋅ 05/08 ⋅ 0

基于RxJava2实现的简单图片爬虫

今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片。刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序。它...

fengzhizi715 ⋅ 2017/10/28 ⋅ 0

基于RxJava2实现的简单图片爬虫

今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片。刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序。它...

Tony沈哲 ⋅ 2017/10/28 ⋅ 0

Vert.x 3.5.0 发布,基于 JVM 的 Node 替代者

Vert.x 3.5.0 正式版已发布。Vert.x 是一个用于下一代异步、可伸缩、并发应用的框架,旨在为JVM提供一个Node.js的替代方案。开发者可以通过它使用JavaScript、Ruby、Groovy、Java、甚至是混合...

王练 ⋅ 2017/10/21 ⋅ 4

Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

RxJava2+Retrofit+RxBinding解锁各种新姿势 本文已授权微信公众号:鸿洋(hongyangAndroid)原创首发。 本篇文章内容包含以下内容 前言 RxJava2的基本介绍 RxJava2观察者模式的介绍 RxJava2观...

qq_30379689 ⋅ 2017/04/03 ⋅ 0

今日力推:高性能、简洁优雅的 Web 框架 / 一个能让微信 Material Design 化的

今日力推:高性能、简洁优雅的 Web 框架 / 一个能让微信 Material Design 化的 干货集中营2017-12-193 阅读 webdesign框架微信 iOS IOS新一代界面开发利器 —— FlexLib (zhenglibao) Andro...

干货集中营 ⋅ 2017/12/19 ⋅ 0

AnnotationProcessor的小总结

以下内容主要讲解EventBus、Retrofit、Dagger2、ButterKnife等流行第三方库都在使用的AnnotationProcessor技术. 一 AnnotationProcessor工作流程 通过解析在源码中使用注解标记的类/变量/方法...

liaowenhao ⋅ 2017/12/30 ⋅ 0

Android实战——RxJava2解锁图片三级缓存框架

RxJava2解锁图片三级缓存框架 本篇文章包括以下内容 前言 图片三级缓存的介绍 框架结构目录的介绍 构建项目整体框架 实现图片三级缓存 演示效果 源码下载 结语 前言 RxJava2作为如今那么主流...

qq_30379689 ⋅ 2017/04/07 ⋅ 0

RxJava之旅(2):RxJava入门基础(1)

0.前言 这可能是我拖延最严重的系列文章了,从第一篇《 RxJava之旅(1):RxJava思想基础》到这一篇的间隔差不多有一年,导致每想到这个系列都惭愧不已,甚至好几次冲动差点删了。事情是这样的...

Chuckiefan ⋅ 2017/08/17 ⋅ 0

一套整合主流HTTP网络、图片加载、MVP(RxJava2+Dagger2)架构的快速高效的开发框架RxEasyAndroid

需要的环境 JDK1.7 SDK AndroidStudio开发工具 特性 整合主流HTTP网络(Retrofit2、OKHTTP3)、图片加载(Glide)、MVP(RxJava2+Dagger2)架构的一套快速高效的开发框架 包含app library 两...

wu928320442 ⋅ 2017/10/23 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

windows profesional 2017 build problem

.net framework .... https://stackoverflow.com/questions/43330915/could-not-load-file-or-assembly-microsoft-build-frameworkvs-2017...

机油战士 ⋅ 19分钟前 ⋅ 0

python3中报错的解决方法(长期更新)

1、ImportError: No module named ‘DjangoUeditor’ 出错原因:安装DjangoUeditor库适用于python2,需要下载适用python3的 下载地址:https://github.com/twz915/DjangoUeditor3 2、python3......

xiaoge2016 ⋅ 24分钟前 ⋅ 0

数据结构与算法之双向链表

一、双向链表 1.双向链表的结点结构 typedef struct DualNode{ ElemType data; struct DualNode *prior; // 前驱结点 struct DualNode *next; // 后继结点}DualNode, *DuL...

aibinxiao ⋅ 44分钟前 ⋅ 0

五大最核心的大数据技术

大数据技术有5个核心部分,数据采集、数据存储、数据清洗、数据挖掘、数据可视化。关于这5个部分,有哪些核心技术?这些技术有哪些潜在价值?看完今天的文章就知道了。 大数据学习群:7165810...

董黎明 ⋅ 45分钟前 ⋅ 0

PhpStorm 头部注释、类注释和函数注释的设置

首先,PhpStorm中文件、类、函数等注释的设置在:setting-》Editor-》FIle and Code Template-》Includes下设置即可,其中方法的默认是这样的: /**${PARAM_DOC}#if (${TYPE_HINT} != "v...

nsns ⋅ 45分钟前 ⋅ 0

spring.net AOP

http://www.springframework.net/doc-latest/reference/html/aop-quickstart.html https://www.cnblogs.com/wujy/archive/2013/04/06/3003120.html...

whoisliang ⋅ 50分钟前 ⋅ 0

【HAVENT原创】创建 Dockerfile 生成新的镜像,并发布到 DockerHub

注意:Win7 与 Win10 的版本存在差异,Win7 版本使用 Docker Quickstart Terminal 进入控制台,Win10下面直接用管理员权限打开控制台或者 PowerShell 即可;另外 Win7 下面只能访问 C盘,/ap...

HAVENT ⋅ 50分钟前 ⋅ 0

pom.xml出现web.xml is missing ...解决方案

提示信息应该能看懂。也就是缺少了web.xml文件,<failOnMissingWebXml>被设置成true了。 搜索了一下,Stack Overflow上的答案解决了问题,分享一下。 目前被顶次数最多的回答原文如下: This...

源哥L ⋅ 51分钟前 ⋅ 0

js时间戳与日期格式之间相互转换

1. 将时间戳转换成日期格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 // 简单的一句代码 var date = new Date(时间戳); //获取一个时间对象 /** 1. 下面是获取时间日期的方法,需要什么样的格式自己...

Jack088 ⋅ 56分钟前 ⋅ 0

web添加log4j

添加xml配置log4j.properties # Global logging configuration---root日志设置#log4j.rootLogger=info,dailyRollingFile,stdoutlog4j.rootLogger=debug,stdout,dailyRollingFile---......

黄柳淞 ⋅ 57分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部