文档章节

reactor flux

my_juke
 my_juke
发布于 03/27 10:30
字数 668
阅读 44
收藏 0

发布订阅模式

A--->B-->C---->D
subscribe: D--->C--->B--->A
onSubscribe: A--->B--->C---->D
request: D---->C--->B---->A
onNext: A--->B-->-C--->D
complete:A--->B---->C--->D

Flux.just

 Flux<String> fs = Flux.just("1","2");

Flux.fromArray

 Flux<String> fs = Flux.fromArray(new String[]{"1","2"});

Flux.fromIterable

Flux<String> fs = Flux.fromIterable(Arrays.asList("1","2"));

Flux.generate

s.complete 只能调用一次,要不然会报错 一般用于同步

Flux<String> fs = Flux.generate(()->Integer.valueOf(1),(i,s)->{

            s.next(i+"");
            if(i == 10){
                s.complete();
            }
            return ++i;
        });
        fs.subscribe(s->System.out.println(s));

Flux.publishOn

异步发布

Flux<String> f1 = Flux.just("1","2","3","4","5");
        f1.doOnNext(s->{
            System.out.println(Thread.currentThread().getName());
        })
                .publishOn(Schedulers.newSingle("single1"),2).
        doOnNext(s->{
            System.out.println(Thread.currentThread().getName()+"_"+s);
        })
                .subscribe(
                       s->{
                           System.out.println(Thread.currentThread().getName());
                       } );

运行结果

main
main
single1-1_1
single1-1
single1-1_2
single1-1
single1-1
single1-1
single1-1_3
single1-1
single1-1_4
single1-1
single1-1
single1-1_5
single1-1

Flux.subscribeOn

异步订阅:
与publishOn的区别:
publishOn 作用于onNext,onCompelet
subscribeOn作用于 subscribe

 Flux<String> f1 = Flux.just("1","2","3","4","5");
        f1.doOnNext(s->{
            System.out.println(Thread.currentThread().getName());
        })
                .subscribeOn(Schedulers.newSingle("single1"),true).
        doOnNext(s->{
            System.out.println(Thread.currentThread().getName()+"_"+s);
        }) .subscribeOn(Schedulers.newSingle("single2"),true).
                doOnNext(s->{
                    System.out.println(Thread.currentThread().getName()+"_"+s);
                })
                .subscribe(
                       s->{
                           System.out.println(Thread.currentThread().getName());
                       } );

结果

single1-2
single1-2_1
single1-2_1
single1-2
single1-2
single1-2_2
single1-2_2
single1-2
single1-2
single1-2_3
single1-2_3
single1-2
single1-2
single1-2_4
single1-2_4
single1-2
single1-2
single1-2_5
single1-2_5
single1-2

Flux.create

支持背压 用于异步

Flux<String> fs = Flux.create(s->{
            int i = 0;

            while (true){
                System.out.println("mail thread:"+Thread.currentThread().getName());
                s.next(""+i++);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {

                }
            }

        },FluxSink.OverflowStrategy.ERROR);
       // fs.subscribe(s->System.out.println(s));

        fs.publishOn(Schedulers.newSingle("single"),1).subscribe(new BaseSubscriber<String>() {
            Subscription subscription = null;
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(1);
                this.subscription = subscription;
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println(value);
                try {
                    Thread.sleep(500);
                    hookOnSubscribe(subscription);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

Flux.doOnNext

= FluxPeek

Flux.all

返回一个Mono<boolean>
如果全为真为true,否则为false

Flux.range(1,5).all(s->s>0).subscribe(System.out::println);   true
Flux.range(1,5).all(s->s>2).subscribe(System.out::println);   false

Flux.any

Flux.range(1,5).any(s->s>2).subscribe(System.out::println);  true

Flux.interval

Flux.interval(Duration.ofMillis(100)).subscribe(System.out::println);
TimeUnit.HOURS.sleep(1);

Flux.blockFirst blockLast

通过subscribe 实现,里面有CountDownLatch

long t1 = Flux.interval(Duration.ofMillis(100)).blockFirst();
System.out.println(t1);

buffer

Flux.range(1,10).buffer(3,3).subscribe(System.out::println);  //3个一组,每次跨3个
Flux.range(1,10).bufferUntil(s->s>5).subscribe(System.out::println);//每匹配成功就发射一次
Flux.range(1,10).bufferUntil(s->s%3==0,true).subscribe(System.out::println);//匹配成功的放在下一组发射
Flux.range(1,10).bufferWhile(s->s%3==0).subscribe(System.out::println);//每匹配不成功就发射一次
Flux f1 =  Flux.interval(Duration.ofSeconds(1));
        Flux.interval(Duration.ofMillis(500)).buffer(f1).subscribe(s->{
            System.out.println(s);

        });
        TimeUnit.HOURS.sleep(1);
[0, 1]
[2]
[3, 4, 5]
[6, 7]
[8]
[9, 10, 11]
[12]
[13, 14, 15]

Flux.defer

延迟创建

Flux.defer(()->Flux.just(1,2,3)).subscribe(System.out::println)

Flux.using

 Flux.using(()->Integer.valueOf(1),s->{
            if(s>0){
                return Flux.just("2","3");
            }
            return Flux.just("1");

        },System.out::println).subscribe(System.out::println);

flatMap

reduce

Flux.range(1,100).reduce((s1,s2)->{
           return s1+s2;
       }).subscribe(System.out::print);

scan

将每次reduce的结果发射

Flux.range(1,100).scan((s1,s2)->{
           return s1+s2;
       }).subscribe(System.out::println);

window

Flux.range(1,10).window(3).subscribe(s->{
           s.subscribe(System.out::println);
           System.out.println("group");
       });

© 著作权归作者所有

my_juke
粉丝 4
博文 40
码字总数 25607
作品 0
深圳
私信 提问
加载中

评论(0)

聊聊reactive streams的backpressure

序 本文主要研究下reactive streams的backpressure reactive streams跟传统streams的区别 输出实例如下: 传统的list streams不是异步的,好比如一批500件的半成品,得在A环节都处理完,才能...

go4it
2018/01/14
530
0
reactor3 flux的map与flatMap的区别

序 本文主要研究一下flux的map与flatMap的区别 map 这里头的map是纯元素转换 输出 flatMap 这里的flatMap,将元素转为Mono或Flux,转换操作里头还可以进行异步操作 输出 小结 flatMap的转换F...

go4it
2018/02/09
419
0
聊聊reactive streams的Mono及Flux

序 本文主要讲一下reactive streams的Publisher接口的两个抽象类Mono与Flux Publisher reactive-streams-1.0.1-sources.jar!/org/reactivestreams/Publisher.java Mono reactor-core-3.1.2.R......

go4it
2018/01/12
117
0
Spring 5 WebFlux

作者: 一字马胡 转载标志 【2017-11-26】 更新日志 日期 更新内容 备注 2017-11-26 新建文章 Spring 5 WebFlux demo Reactor Spring 5的一大亮点是对响应式编程的支持,下面的图片展示了传统...

一字马胡
2017/11/26
0
0
Flux 会取代 Web MVC,或可不再基于 Servlet 容器了?

对 Java 开发者来说,2017 年 9 月是个热闹的月份不但 Java SE 9、Java EE 8 相继发布,就连 Spring 框架,也在这段时间发布 5.0 正式版。 而新版 Spring 的一大特色,就是 Reactive Web 方案...

周其
2018/04/02
1.9W
22

没有更多内容

加载失败,请刷新页面

加载更多

phpoffice/phpspreadsheet 下载文件

use PhpOffice\PhpSpreadsheet\IOFactory ; use PhpOffice\PhpSpreadsheet\Cell\Coordinate ; use PhpOffice\PhpSpreadsheet\Spreadsheet ; use PhpOffice\PhpSpreadsheet\Writer\Xlsx ; $sp......

dragon_tech
16分钟前
14
0
图解kubernetes控制器HPA横向伸缩的关键实现

HPA是k8s中横向伸缩的实现,里面有很多可以借鉴的思想,比如延迟队列、时间序列窗口、变更事件机制、稳定性考量等关键机制, 让我们一起来学习下大佬们的关键实现 1. 基础概念 HorizontalPo...

8小时
18分钟前
32
0
为随航功能优化 ,Mac 与 iPad珠联璧合

文章来源: https://www.macdown.com/news/4556.html Mac 和 iPad 本来就是各有所长的强力搭档。有了 macOS Catalina 和 iPadOS 的随航功能,这两台设备的配合变得更加顺畅,可以助你实现更多...

麦克虾仔
24分钟前
26
0
大部分程序员还不知道的 Servelt3 异步请求,原来这么简单?

前言 博文地址:https://sourl.cn/URptix 当一个 HTTP 请求到达 Tomcat,Tomcat 将会从线程池中取出线程,然后按照如下流程处理请求: 将请求信息解析为 HttpServletRequest 分发到具体 Serv...

楼下小黑哥
29分钟前
16
0
Apache OpenMeetings开源线上会议系统——安装配置

OpenMeetings是一个开源的在线会议系统,支持音频和视频,同时支持桌面分享。 官网地址:http://openmeetings.apache.org/index.html 安装 当前版本为4.0.6,需要最低jre8的安装环境。 下载a...

JustForFly
32分钟前
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部