文档章节

RxJava 源码分析系列(一) - Observable的基本分析

亭子happy
 亭子happy
发布于 01/18 10:43
字数 3582
阅读 16
收藏 0

楼主最近在找实习工作,由于简历上说了解RxJava,所以在面试的时候应该会问到RxJava的知识,于是楼主结合RxJava的源码,对RxJava的工作原理进行初步的了解。也只敢说是初步了解,因为自己也是第一次看RxJava的源码,理解的程度肯定不是很深。还是那样,如果有错误之处,希望各位指正!
  本文参考:

  1.除非特殊说明,源码来自:2.2.0版本

  2.RxJava从源码到应用 移动端开发效率秒提速

1.概述

  楼主打算将RxJava的源码分析写成一个系列文章,所以这个是这个系列的第一篇文章,在概述里面还是对RxJava是什么简单的介绍一下,本系列文章不会对RxJava的基本用法进行展开,如果有老哥对RxJava的基本使用掌握的不是很好的话,推荐这个系列的文章:给初学者的RxJava2.0教程(一)
  简单的说一下RxJava,RxJava是基于观察者模式的一个框架,在RxJava中有两个角色,一个Observable,通常被称为被观察者,一个是Observer,通常被称为观察者。总体的架构是,由Observable来处理任务或者发送事件,然后在Observer里面来接受到Observable发送过来的信息。
  RxJava有很多的优势,比如线程调度,在Android里面,耗时操作必须放在子线程中,但是同时还需要主线程来更细UI,所以线程调度就显得尤为重要。当然RxJava还有很多重要的操作符,使得我们的开发变得非常的方便。本系列文章不会对每个操作符的基本使用展开,而是对一些比较常用的操作源码分析,所说的常用,也是指楼主用到的!!毕竟是菜鸡,肯定有很多的东西都不太懂。

2.基本元素

  想要对RxJava的基本原理有一个更好的了解,必须对它的基本有一个大概的了解。我们先通过一个简单的案例,来对RxJava的基本元素进行提取。

    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {

      }
    }).subscribe(new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(String s) {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
    });

  在这个简单的案例当中,我们可以提取的元素有:ObservableObservableOnSubscribeObservableEmitter,Observer
  元素还是挺少的,我们现在对每个元素的类结构来进行简单的分析一下。

(1).Observable

public abstract class Observable<T> implements ObservableSource<T> {
}

  我们发现Observable本身是一个抽象类,并且实现了ObservableSource接口,在来看看ObservableSource接口里面有什么。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

  ObservableSource接口里面只有一个subscribe方法,也就是说,RxJava将注册观察者这部分的功能提取成一个接口,从而可以看出来,面向接口编程是多么的重要😂😂。。。
  再分别来看看我们上面案例中使用的两个方法--createsubscribe

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 先省略代码部分,待会详细的分析。 
    }

  啊,吓我一跳,我以为create方法的参数又是一个接口类型,还好是ObservableOnSubscribe类型,也是上面提取出来的元素其中之一,关于这个类,待会会详细的分析。

    public final void subscribe(Observer<? super T> observer) {
      //...
    }

  这个方法就更加的简单了,就是传递了一个Observer接口的对象。不过需要注意的是这个方法有很多的重载,其中以Consumer类型的操作最为多,不过这个也没什么,最后还是Consumer转换成为了Observer,这个就涉及到Observer接口的一个实现类--LambdaObserver。不要害怕,待会都会一一的讲解的。

(2).Observer

  说了被观察者,我们先来看看观察者--Observer

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

  哎呀呀,更加的简单了, Observer只是简单的接口,不过我们需要注意的是这个接口定义的4个方法,这里不讲解四个方法的作用,毕竟我们这里将Observable的基本原理🙄🙄。

(3).ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

  一如既往的接口,subscribe方法里面就是具体做事情的地方,这个相信大佬们应该都知道,我这里就班门弄斧的提醒一下😂😂。

(4).ObservableEmitter

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
    boolean tryOnError(@NonNull Throwable t);
}

  ObservableEmitter也是一个接口,同时继承了Emitter接口,我们来看看Emitter接口的定义

public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

  作为一个发射信息器,Emitter里面定义了很多关于发送消息给Observer的方法,EmitteronNext对应着ObserveronNext方法,其他的方法也是类似的。

3.Observable的工作原理

(1).create方法

  我们对相关部分的基本元素有了一个基本的了解,现在我们来对整个流程的工作原理进行分析。首先我们create方法入手

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

  create方法没有我们想象中的那么难,就只有两行代码,还有一行用来check的😂😂。对于ObservableCreate类这里先不进行分析,我们来看看 RxJavaPluginsonAssembly方法。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

  这里提醒一下,onAssembly方法的参数类型是Observable类型,也就是说ObservableCreate本身就是一个Observable。好了,扯了题外话,来看看onAssembly方法具体是干嘛的。
  整个方法的执行过程比较简单,如果onObservableAssembly为null,直接就返回了source,也就是说返回了ObservableCreate本身。而我们在整个Observable的源码中发现,onObservableAssembly初始值本身为null。

    public static void reset() {
        //······
        setOnObservableAssembly(null);
        //······
    }

  为什么需要这样子绕圈子的做呢?这里就是做了钩子,以便于以后的扩展。
  所以Observablecreate方法就是返回了一个ObservableCreate对象,不过需要注意的是ObservableCreate包裹了一个ObservableOnSubscribe对象,也就是我们在create方法里面new的那个ObservableOnSubscribe对象。
  我们先来不急着去理解ObservableCreate是什么,还是来看看subscribe方法为我们做了什么。

(2). subscribe方法

  当我们通过Observable的create方法来获取一个Observable对象时,通常还会调用Observable的subscribe方法来注册一个观察者。现在我们来看看subscribe方法的实现。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

  整个过程也不是想象中的那么神秘,除去check相关的方法不看,归根结底就是两行代码,先是通过RxJavaPluginsonSubscribe方法来获取Observer对象,具体操作这里就不说了,肯定跟RxJavaPluginsonAssembly方法差不多,最后返回的是observer本身,最后调用了subscribeActual方法。这个subscribeActual方法是干嘛的?

    protected abstract void subscribeActual(Observer<? super T> observer);

  卧了个槽?抽象方法!那我怎么知道调用的是哪个类的subscribeActual方法?不急哈,记得我们之前在create方法返回的Observable对象是哪个类的对象吗?想起来了吧,是ObservableCreate

(3). ObservableCreate

  先来看看ObservableCreate类结构。

public final class ObservableCreate<T> extends Observable<T> {
}

  我们发现,ObservableCreate继承了Observable,其实在分析create方法时,我也说过哟。
  在ObservableCreate类中,只有一个ObservableOnSubscribe类型的成员变量,这个成员变量就是我们在create方法里面new的ObservableOnSubscribe对象
  我们再来看看ObservableCreatesubscribeActual方法的实现

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

  在subscribeActual方法里面,先是对Observer对象进行一次包装,将它包装在CreateEmitter类中。然后我们会发现两个比较眼熟的方法onSubscribe方法和subscribe方法。其中onSubscribe方法在Observer里面看到过,而这里恰好是通过Observer对象来调用的,没错,这个的observer就是在subscribe方法里面new的对象。可是我们记得onSubscribe方法的参数类型是Disposable,而这里是一个CreateEmitter。我们来看看CreateEmitter的类结构:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    //······
}

  没错,CreateEmitter实现了Disposable接口,所以CreateEmitter本身可以充当Disposable的角色。
  调用了ObserveronSubscribe方法之后,然后就会调用ObservableOnSubscribesubscribe方法。
  到这里,我们应该彻底的明白了整个Observable的工作流程。我们通过create方法创建一个ObservableCreate方法,然后调用了subscribe方法来注册了一个观察者,在subscribe方法里面又调用了subscribeActual方法,在subscribeActual方法里面先是调用了ObserveronSubscribe方法,然后调用了
ObservableOnSubscribesubscribe方法,在ObservableOnSubscribesubscribe方法当中,具体的做的事有两件:1.做我们自己的事情,比如从服务器上获取数据之类;2.将发送信息到Observer去。
  理解了整个流程的工作原理,我们现在来看看CreateEmitter是怎么信息发给Observer的。

4. CreateEmitter的工作原理

  我们知道,我们在ObservableOnSubscribesubscribe方法里面使用ObservableEmitter来发射信息到Observer。现在我们来看看整个CreateEmitter的工作原理,不过,我们还是先来看看这个类的结构,虽然上面已经看了,但是担心大佬们忘了:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    //······
}

  在上面已经说了CreateEmitter实现了Disposable接口,可以作为Disposable对象来操作,在接下来,我们将重点介绍Disposable是怎么控制Observer对信息的接收,同时还会介绍CreateEmitter作为ObservableEmitter接口的那部分功能。
  之前在分析基本元素时,已经说了ObservableEmitter这个接口,它实现了Emitter接口。在Emitter接口里面有三个方法用来发送信息给Observer,分别是:onNextonErroronComplete。而CreateEmitter类则是具体的实现了这三个方法,我们来看看。

        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

  代码是非常的简单,直接调用了ObserveronNext方法,也没用什么高逼格的东西😂😂。其余两个方法也是如此。只不过是,在调用onNext方法时做了一个isDisposed的判断。
  所以感觉Disposable才是这个类的核心。我们来看看isDisposed方法:

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

  在isDisposed方法里面调用了DisposableHelperisDisposed方法。不过这里需要注意的是这里传递过去的是get方法的返回值,这个返回值什么意思?
  回到CreateEmitter的类结构,发现它继承了AtomicReference类,所以get方法返回的是一个Disposable对象。
  同时,我们发现CreateEmitterdispose方法也是通过DisposableHelper类进行进行操作的,看看要理解Disposable的功能,必须了解DisposableHelper是怎么操作的。

5.DisposableHelper

  从感官上来说,一个发射信息器是否dispose,直接设置一个boolean类型的flag就OK了,为什么搞得这么复杂,又是AtomicReference,又是DisposableHelper。这一切,我们从DisposableHelper来寻找答案。
  首先我们还是来看看DisposableHelper的结构:

public enum DisposableHelper implements Disposable {
    DISPOSED
    ;
}

  DisposableHelper本身是一个enum类型,同时实现了Disposable接口。这里使用enum主要是为了做一个DISPOSED的单例。然后在通过isDisposed方法来判断是否dispose,可以直接与DISPOSED比较。

    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

  既然判断是否dispose是直接与DISPOSED比较,那么如果dispose的话,应该是将AtomicReference里面的值设置为DISPOSED吧?我们来看一下dispose方法:

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

  果然,跟我们猜测一样的,AtomicReference里面的值设置为DISPOSED。只是,这里为了线程安全,做了很多的判断操作。
  从这里我们可以得到,为什么需要设置DisposableHelper来控制dispose的状态,那是因为线程安全,如果直接设置一个flag,在有些情况下,可能存在线程不安全的风险。同时为了代码的优雅,如果这部分的逻辑写在CreateEmitter里面,会不会显得冗杂呢?

6.总结

  写到这里,我感觉也差不多了。这里对着部分的知识做一个总结。
  1.在整个流程中,基本有Observable,ObservableOnSubscribe,ObservableEmitter,Observer,如果想要对整个过程有一个大概的理解,必须对这几个元素有基本的认识。
  2.ObserveronNext之类方法的触发时机,实际上是Observablesubscribe方法,因为subscribe方法调用了ObservablesubscribeActual方法,而在subscribeActual方法里面做了两部分的操作:1.直接调用了ObserveronSubscribe方法;2.使用ObservableEmitterObserver包裹起来,所以我们在ObservableOnSubscribesubscribe方法用ObservableEmitter来发射信息,相当于调用了Observer的相关方法。
  3.在ObservableEmitteronNext之类方法里面,存在一种类似AOP的代码,因为在调用Observer的相关方法,做了一些其他的操作。



作者:琼珶和予
链接:https://www.jianshu.com/p/f17821d2cf78
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

本文转载自:https://www.jianshu.com/p/f17821d2cf78

共有 人打赏支持
亭子happy
粉丝 118
博文 233
码字总数 46492
作品 0
海淀
程序员
私信 提问
Android技能树 - Rxjava源码(1) 之 初步结构

前言 以前写相关的RxJava的文章比较少,也就下面二篇: 目需求讨论 — 手把手带你写RxPermission RxJava小考题 -- Rxjava源码分析(一) 用了这么久的RxJava,所以想要做个总结,所以打算也写一...

青蛙要fly
02/11
0
0
Rxjava 2.x 源码系列 - 线程切换 (上)

Rxjava 2.x 源码系列 - 基础框架分析 Rxjava 2.x 源码系列 - 线程切换 (上) Rxjava 2.x 源码系列 - 线程切换 (下) 前言 在上一篇博客 Rxjava 源码系列 - 基础框架分析,我们分析了 Rxja...

xujun9411
2018/06/06
0
0
Rxjava 2.x 源码系列 - 变换操作符 Map(上)

Rxjava 2.x 源码系列 - 基础框架分析 Rxjava 2.x 源码系列 - 线程切换 (上) Rxjava 2.x 源码系列 - 线程切换 (下) Rxjava 2.x 源码系列 - 变换操作符 Map(上) 前言 在前几篇博客中,我...

xujun9411
2018/06/26
0
0
Rxjava 2.x 源码系列 - 线程切换 (下)

Rxjava 2.x 源码系列 - 基础框架分析 Rxjava 2.x 源码系列 - 线程切换 (上) Rxjava 2.x 源码系列 - 线程切换 (下) 前言 在上一篇博客 Rxjava 2.x 源码系列 - 线程切换 (上) 我们讲解到...

xujun9411
2018/06/11
0
0
Rxjava 源码系列 - 基础框架分析

Rxjava 源码系列 - 基础框架分析 前言 Rxjava RxAndroid 本篇博客讲解的 Rxjava 的原理基于版本 2.1.4,RxAndroid 的原理的版本基于 2.0.2 。 基本框架 Rxjava 有四个基本的概念 Observable...

xujun9411
2018/05/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

js中原型模式

原型模式就是用克隆对象来创建对象的,在es6中提供了Object.create方法来克隆对象 在不支持该方法的浏览器中,可以使用以下代码: Object.create = Object.create || function (obj) { ...

莫西摩西
24分钟前
1
0
ORACLE插入insert带where条件

ORACLE插入带条件 insert into THIRD_PAYEE_ACCOUNT_DETAIL t (id, collection_mode, t.store_code) select Third_Payee_Account_Detail_Seq.Nextval,'006','A05K' from dual where not......

Cobbage
34分钟前
1
0
分享几张Java架构大牛总结的架构知识脑图

前言 系统架构师是一个既需要掌控整体又需要洞悉局部瓶颈并依据具体的业务场景给出解决方案的团队领导型人物。一个架构师得需要足够的想像力,能把各种目标需求进行不同维度的扩展,为目标客户...

Java-飞鱼
41分钟前
12
0
2019BATJ面试题详解:MyBatis+MySQL+Spring+Redis+多线程

这里为大家分享一些面试的一手资料,供大家迎接接下来的金三银四跳槽季 Spring Spring 概述 什么是spring? 使用Spring框架的好处是什么? Spring由哪些模块组成? 解释AOP模块 Spring配置文件...

别打我会飞
44分钟前
3
0
首场百度大脑开放日来袭 | 全新开放24项AI技术

活动当天,百度AI技术生态部总经理喻友平,就百度大脑平台与生态进行了全面的详解,同时展示了百度大脑开放平台Q1核心升级内容,包括语音技术、视觉技术、自然语言处理、知识图谱等通用AI能力...

PaddleWeekly
45分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部