文档章节

对RxJava中.repeatWhen()和.retryWhen()操作符的思考

hejunbinlan
 hejunbinlan
发布于 2016/05/03 17:04
字数 1810
阅读 34
收藏 1

第一次见到 .repeatWhen() 和 .retryWhen() 这两个操作符的时候就非常困惑了。不得不说,它们绝对是“最令人困惑弹珠图”的有力角逐者。

然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable 。我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们)。

Repeat与Retry的对比

首先,来了解一下 .repeat() 和 .retry() 之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。

当 .repeat() 接收到 .onCompleted() 事件后触发重订阅。

当 .retry() 接收到 .onError() 事件后触发重订阅。

然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen() 和 .retryWhen() 的介入了,因为它们允许你为重试提供自定义逻辑。

Notification Handler

你可以通过一个叫做 notificationHandler 的函数来实现重试逻辑。这是.retryWhen() 的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)

签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。

简化后,它包括三个部分:

  1. Func1 像个工厂类,用来实现你自己的重试逻辑。
  2. 输入的是一个 Observable<Throwable> 。
  3. 输出的是一个 Observable<?> 。

首先,让我们来看一下最后一部分。被返回的 Observable<?> 所要发送的事件决定了重订阅是否会发生。如果发送的是 onCompleted 或者 onError 事件,将不会触发重订阅。相对的,如果它发送 onNext 事件,则触发重订阅(不管onNext 实际上是什么事件)。这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。

source每次一调用 onError(Throwable) , Observable<Throwable> 都会被作为输入传入方法中。换句话说就是,它的每一次调用你都需要决定是否需要重订阅。

当订阅发生的时候,工厂 Func1 被调用,从而准备重试逻辑。那样的话,当onError 被调用后,你已经定义的重试逻辑就能够处理它了。

这里有个例子展示了我们应该在哪些场景下订阅 source ,比如,只有在Throwable 是 IOException 的情况下请求重订阅,否则不(重订阅)。

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors.flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable error) { // For IOExceptions, we retry if (error instanceof IOException) { return Observable.just(null); } // For anything else, don't retry return Observable.error(error); } }); } })

由于每一个error都被flatmap过,因此我们不能通过直接调用 .onNext(null) 触发重订阅或者 .onError(error) 来避免重订阅。

经验之谈

这里有一些关于 .repeatWhen() 和 .retryWhen() 的要点,我们应该牢记于心。

  • .repeatWhen() 与 .retryWhen() 非常相似,只不过不再响应onCompleted 作为重试条件,而是 onError 。因为 onCompleted 没有类型,所有输入变为 Observable<Void> 。

  • 每一次事件流的订阅 notificationHandler (也就是 Func1 )只会调用一次。这也是讲得通的,因为你有一个可观测的 Observable<Throwable> ,它能够发送任意数量的error。

  • 输入的 Observable 必须作为输出 Observable 的源。你必须对Observable<Throwable> 做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。

换言之就是,你不能做类似的操作:

 .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return Observable.just(null);} })

因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors; } })

(顺便提一下,这在逻辑上与单纯使用 .retry() 操作符的效果是一样哒)

  • 输入 Observable 只在终止事件发生的时候才会触发(对于 .repeatWhen()来说是 onCompleted ,而对于 .retryWhen() 来说是 onError )。它不会从源中接收到任何 onNext 的通知,所以你不能通过观察发被送的事件来决定重订阅。如果你真的需要这样做,你应该添加像 .takeUntil() 这样的操作符,来拦截事件流。

使用方式

现在,假设你已大概了解了 .repeatWhen() 和 .retryWhen() ,那么你能将一些什么样的精简逻辑放入到 notificationHandler 中呢?

使用.repeatWhen() + .delay()定期轮询数据:

source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Void> completed) { return completed.delay(5, TimeUnit.SECONDS); } })

直到 notificationHandler 发送 onNext() 才会重订阅到source。因为在发送 onNext() 之前 delay 了一段时间,所以优雅的实现了延迟重订阅,从而避免了不间断的数据轮询。

非此即彼,使用.flatMap() + .timer()实现延迟重订阅:

(译者注:在 RxJava 1.0.0 及其之后的版本,官方已不再提倡使用 .timer() 操作符,因为 .Interval() 具有同样的功能)

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors.flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable error) { return Observable.timer(5, TimeUnit.SECONDS); } }); } })

当需要与其他逻辑协同的时候,这种替代方案就变得非常有用了,比如。。。

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer i) { return i; } }); } })

最后的结果就是每个error都与 rang 中一个输出配对出现,就像这样:

zip(error1, 1) -> onNext(1) <-- Resubscribe zip(error2, 2) -> onNext(2) <-- Resubscribe zip(error3, 3) -> onNext(3) <-- Resubscribe onCompleted() <-- No resubscription

因为当第四次error出现的时候, range(1,3) 中的数字已经耗尽了,所以它隐式调用了 onCompleted() ,从而导致整个 zip 的结束。防止了进一步的重试。

将可变延迟策略与次数限制的重试机制结合起来

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> errors) { return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer i) { return i; } }).flatMap(new Func1<Integer, Observable<? extends Long>>() { @Override public Observable<? extends Long> call(Integer retryCount) { return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS); } }); } })

在这种用例的比较上,我认为 .flatMap() + .timer() 的组合比单纯使用.delay() 更可取,因为我们可以通过重试次数来修改延迟时间。重试三次,并且每一次的重试时间都是 5 ^ retryCount ,仅仅通过一些操作符的组合就帮助我们实现了指数退避算法(译者注:可参考 二进制指数退避算法 )。

本文转载自:http://www.qingpingshan.com/rjbc/java/49285.html

共有 人打赏支持
hejunbinlan
粉丝 41
博文 596
码字总数 21569
作品 0
浦东
高级程序员
私信 提问
深入理解 RxJava2:前世今生(1)

前言 本系列文章适用于已经了解 RxJava 的读者,深入贯彻其原理,加深对其的认识。如果从未了解过 RxJava 的读者们,建议先熟悉它。 RxJava 0.x RxJava 最早是 Netflix 参照微软的 Rx.Net,在...

任我行
2018/08/12
0
0
RxJava 沉思录(一):你认为 RxJava 真的好用吗

本人两年前第一次接触 RxJava,和大多数初学者一样,看的第一篇 RxJava 入门文章是扔物线写的《给 Android 开发者的 RxJava 详解》,这篇文章流传之广,相信几乎所有学习 RxJava 的开发者都阅...

掘金官方
2018/09/07
0
0
《RxJava 2.x 实战》

我最近写了一本书《RxJava 2.x 实战》。 该书由电子工业出版社出版,目前已经在淘宝、京东、当当、亚马逊等各大电商平台上进行销售或预售。 天猫电子工业出版社旗舰店 https://detail.tmall...

fengzhizi715
2018/04/28
0
0
RxJava从入门到不离不弃(八)——使用场景

RxJava系列的文章已经写了有七篇了,相信读者已经对它比较熟悉了。 介绍了那么多,那么到底RxJava在真实开发中会有哪些地方用到呢?今天和大家介绍一下它的经典使用场景。 RxJava + Retrofi...

Android机动车
2018/11/05
0
0
Android RxJava: 这是一份全面的 操作符 使用汇总 (含详细实例讲解)

前言 ,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 开发者的欢迎。 如果还不了解RxJava,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程 如此受欢迎的原...

Carson_Ho
2018/05/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

容器服务

简介 容器服务提供高性能可伸缩的容器应用管理服务,支持用 Docker 和 Kubernetes 进行容器化应用的生命周期管理,提供多种应用发布方式和持续交付能力并支持微服务架构。 产品架构 容器服务...

狼王黄师傅
昨天
3
0
高性能应用缓存设计方案

为什么 不管是刻意或者偶尔看其他大神或者大师在讨论高性能架构时,自己都是认真的去看缓存是怎么用呢?认认真真的看完发现缓存这一块他们说的都是一个WebApp或者服务的缓存结构或者缓存实现...

呼呼南风
昨天
12
0
寻找一种易于理解的一致性算法(扩展版)

摘要 Raft 是一种为了管理复制日志的一致性算法。它提供了和 Paxos 算法相同的功能和性能,但是它的算法结构和 Paxos 不同,使得 Raft 算法更加容易理解并且更容易构建实际的系统。为了提升可...

Tiny熊
昨天
2
0
聊聊GarbageCollectionNotificationInfo

序 本文主要研究一下GarbageCollectionNotificationInfo CompositeData java.management/javax/management/openmbean/CompositeData.java public interface CompositeData { public Co......

go4it
昨天
3
0
阿里云ECS的1M带宽理解

本文就给大家科普下阿里云ECS的固定1M带宽的含义。 “下行带宽”和“上行带宽” 为了更好的理解,需要先给大家解释个词“下行带宽”和“上行带宽”: 下行带宽:粗略的解释就是下载数据的最大...

echojson
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部