文档章节

Project Reactor 之 publishOn 与 subscribeOn

编走编想
 编走编想
发布于 2018/12/31 19:54
字数 1290
阅读 10
收藏 0

一、概述

在 Spring Reactor 项目中,有两个出镜较少的方法:publishOnsubscribeOn。这两个方法的作用是指定执行 Reactive Streaming 的 Scheduler(可理解为线程池)。

为何需要指定执行 Scheduler 呢?一个显而易见的原因是:组成一个反应式流的代码有快有慢,例如 NIO、BIO。如果将这些功能都放在一个线程里执行,快的就会被慢的影响,所以需要相互隔离。这是这两个方法应用的最典型的场景。

二、Scheduler

在介绍 publishOnsubscribeOn 方法之前,需要先介绍 Scheduler 这个概念。在 Reactor 中,Scheduler 用来定义执行调度任务的抽象。可以简单理解为线程池,但其实际作用要更多。先简单介绍 Scheduler 的实现:

  • Schedulers.elastic(): 调度器会动态创建工作线程,线程数无上界,类似于 Execturos.newCachedThreadPool()
  • Schedulers.parallel(): 创建固定线程数的调度器,默认线程数等于 CPU 核心数。

关于 Scheduler 的更多作用留在以后介绍。

三、publishOn 与 subscribeOn

接下来进入正题。先看两个例子(来自 https://github.com/reactor/lite-rx-api-hands-on)

publishOn 的例子

Mono<Void> fluxToBlockingRepository(Flux<User> flux, 
                                    BlockingRepository<User> repository) {
    return flux
                .publishOn(Schedulers.elastic())
                .doOnNext(repository::save)
                .then();
}

subscribeOn 的例子

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository)  {
    return Flux
                .defer(() -> Flux.fromIterable(repository.findAll()))
                .subscribeOn(Schedulers.elastic());
}

这里的 repository 的类型是 BlockingRepository,指的是会导致线程阻塞的数据库操作的集合,例如 JPA、MyBatis 等基于 JDBC 技术实现的 DAO。

在第一个例子中,在执行了 publishOn(Schedulers.elastic()) 之后,repository::save 就会被 Schedulers.elastic() 定义的线程池所执行。

而在第二个例子中,subscribeOn(Schedulers.elastic()) 的作用类似。它使得 repository.findAll()(也包括 Flux.fromIterable)的执行发生在 Schedulers.elastic() 所定义的线程池中。

从上面的描述看,publishOnsubscribeOn 的作用类似,那两者的区别又是什么?

两者的区别

简单说,两者的区别在于影响范围。publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程。所以,publishOn 的影响范围和它的位置有关,而 subscribeOn 的影响范围则和位置无关。

看个 publishOnsubscribeOn 同时使用的例子

Flux.just("tom")
		.map(s -> {
			System.out.println("[map] Thread name: " + Thread.currentThread().getName());
			return s.concat("@mail.com");
		})
		.publishOn(Schedulers.newElastic("thread-publishOn"))
		.filter(s -> {
			System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
			return s.startsWith("t");
		})
		.subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
		.subscribe(s -> {
			System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
			System.out.println(s);
		});

输出结果如下:

[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com

从上面的例子可以看出,subscribeOn 定义在 publishOn 之后,但是却从源头开始生效。而在 publishOn 执行之后,线程池变更为 publishOn 所定义的。

实际用途

这里介绍 publishOnsubscribeOn 的一种实际用途,那就是反应式编程和传统的,会导致线程阻塞的编程技术混用的场景。其实开头两个例子已经解释了这个场景。

在第一个 publishOn 的例子中,repository::save 会导致线程阻塞,为了避免造成对其它反应式操作的影响,便使用 publishOn 改变其执行线程。

在第二个 subscribeOn 的例子中,repository.findAll() 会导致线程阻塞。但是其是源头的 publisher,因此不能使用 publishOn 改变其 执行线程。这时就需要使用 subscribeOn,在源头上修改其执行线程。

这样,通过 publishOnsubscribeOn 就在反应式编程中实现了线程池隔离的目的,一定程度上避免了会导致线程阻塞的程序执行影响到反应式编程的程序执行效率。

局限性

使用 publishOnsubscribeOn 只能在一定程度上避免反应式编程代码执行的效率被影响。因为用来隔离的线程池资源终归是有限的,比如当出现数据库资源不足、慢查询等问题时,对应的线程池资源如果被耗尽,还是会使整个反应式编程的执行效率受到影响。

目前,Redis、Mongo、Couchbase 等非关系型数据库均有相应的反应式编程的解决方案,但是关系型数据库却没有理想的方案。一个重要原因是 JDBC 本身就是一个阻塞式的 API,根本不可能让其适应反应式编程。因此需要一个新的方案。目前 Oracle 正在推动 ADBA (Asynchronous Database Access API),使得关系型数据库可以满足异步编程的需要。但是,因为是 Oracle 主导,大家都懂的,所以目前前景还不是很明朗。另外一个技术方案是 Spring 推动的 R2DBC,从名字上来看就很像是 JDBC 在反应式编程领域的对应的解决方案。目前可以支持 PostgreSQL,支持 MySQL 目前还尚需时日。

后续

接下来关于 Project Reactor 的文章我打算向大家介绍一下 Hot 和 Cold Publisher 的概念以及 Project Reactor 的源码实现。

我的技术公众号“编走编想”

© 著作权归作者所有

共有 人打赏支持
编走编想
粉丝 149
博文 127
码字总数 109248
作品 0
海淀
程序员
私信 提问
聊聊reactive streams的schedulers

序 本文主要研究一下reactive streams的schedulers 背景 默认情况下Mono以及Flux都在主线程上运行,有时候可能会阻塞主线程,可以通过设定schedulers让其在其他线程运行。 原始输出 没有使用...

go4it
2018/01/15
0
0
(4)Reactor 3快速上手——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》 前情提要 响应式编程 | 响应式流 | lambda与函数式 本文源码 1.3.2 Project Reactor Project Reactor(以下简称“Reactor”)与Spring是兄弟项目,...

享学IT
2018/06/26
0
0
RxJava 1.x使用与理解

RxJava 1.x使用与理解——2018.5.22 前一段时间,项目引入RxJava,用起来很简单,但是对原理不甚理解,于是参考各种资料,对照源码,进行了深入学习,写在这里,希望对看到的小伙伴有所帮助 ...

lichuangnk
2018/06/12
0
0
Project Reactor 官方文档翻译

项目描述 将 Project Reactor 的官方文档《Reactor Guide》翻译成中文。文档翻译完成后,将发布在开源中国社区共享。 文档英文地址:http://projectreactor.io/docs/reference/ 人员要求 翻译...

sikkx
2015/10/26
897
4
Reactor 2.0.0.RC1 发布,支持 Reactive Stream

Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM 语言提供了构建基于事件和数据驱动应用的抽象库。Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每...

oschina
2015/02/19
2.2K
1

没有更多内容

加载失败,请刷新页面

加载更多

搜索引擎(Solr-索引详解)

时间字段类型特别说明 Solr中提供的时间字段类型( DatePointField, DateRangeField,废除的TrieDateField )是以时间毫秒数来存储时间的。 要求字段值以ISO-8601标准格式来表示时间:YYYY-MM...

这很耳东先生
25分钟前
0
0
Java成神之路

1、基础篇 01、面向对象 → 什么是面向对象 面向对象、面向过程 面向对象的三大基本特征和五大基本原则 → 平台无关性 Java 如何实现的平台无关 JVM 还支持哪些语言(Kotlin、Groovy、JRuby...

asdf08442a
55分钟前
2
0
dubbo源码分析-服务导出

简介 dubbo框架spring Schema扩展机制与Spring集成,在spring初始化时候加载dubbo的配置类。 dubbo服务导出的入口类是ServiceBean的onApplicationEvent方法 ServiceBean的继承关系如下 publ...

王桥修道院副院长
今天
0
0
QQ音乐的动效歌词是如何实践的?

本文由云+社区发表 作者:QQ音乐技术团队 一、 背景 1. 现状 歌词浏览已经成为音乐app的标配,展示和动画效果也基本上大同小异,主要是单行的逐字染色的卡拉OK效果和多行的滚动效果。当然,我...

腾讯云加社区
今天
4
0
idea里配置springboot项目打热部署

首先添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional></dependency> 然后添......

shatian
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部