JAVA Reactor API 简单使用(Flux和Mono)及WebFlux的应用

原创
2021/03/07 12:36
阅读数 6.8K

一. Reactor API 简单使用(Flux和Mono)

1. 常用创建Flux及Mono的方式

1.1. 使用just从现有的已知内容和大小的数据创建Flux或Mono

//使用数组创建一个被观察者(生产者,Flux)
Flux.just(new String[]{"hello",",","nihao","!"})
    //观察者监听被观察者(消费者)
    .subscribe(System.out::print);
//使用可变参数创建Flux
Flux.just("你","好","啊","!")
    .subscribe(System.out::print);
//使用just创建Mo
Mono.just("asd").subscribe(System.out::println);

1.2. 使用fromIterable从可迭代对象中创建Flux

//从可迭代的对象中创建Flux
Flux.fromIterable(Arrays.asList("你好",",","fromIter","!"))
    .subscribe(System.out::print);
var list = new ArrayList<String>(List.of("你","好"));
Flux<String> flux = Flux.fromIterable(list);
list.add("啊");//在创建Flux后追加元素
flux.subscribe(System.out::print);//这里输出: 你好啊

1.3. 使用fromStream从集合流中创建Flux

//流也可以是Arrays.asList("a", "b").stream()等方式返回的流
Flux.fromStream(Stream.of("从","流","中创建","Flux!"))
    .subscribe(System.out::println);

1.4. 使用range中创建一个范围内迭代的Flux

Flux.range(0,10).subscribe(System.out::print);

1.5. 使用interval创建间隔某一时间异步执行的Flux

Flux.interval(Duration.ofMillis(100))
    //限制执行10次
    .take(10)
    .subscribe(System.out::print);
//避免主线程提前结束
Thread.sleep(1100);

1.6. 从Mono转化而来的Flux

Mono.just("asd").flux().subscribe(System.out::print);

1.7. 从多个Mono组合而来的Flux

Mono.just("Mono1").concatWith(Mono.just("---Mono2"))
    .subscribe(System.out::println);

1.8. 使用generate动态创建Flux只能执行一次的Flux

// 同步动态创建,next 只能被调用一次
Flux.generate(sink -> {
    sink.next("第一次");
    //第二次会报错:
    //java.lang.IllegalStateException: More than one call to onNext
    //sink.next("第二次");
    sink.complete();
}).subscribe(System.out::print);

1.9. 使用create动态创建Flux可以执行多次的Flux,及Mono

// 同步动态创建,next 能被调用多次
Flux.create(sink -> {
    for (int i = 0; i < 10; i++) {
        sink.next("现在的次数:" + i);
    }
    sink.complete();
}).subscribe(System.out::println);
// 同步动态创建Mono
Mono.<String>create(sink->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    sink.success("by create");
}).subscribe(System.out::println);

1.10. 使用fromCallable动态创建Mono

Mono.fromCallable(() -> {
    Thread.sleep(1000);
    return "asd";
}).subscribe(System.out::println);

2. 异常处理

//直接创建一个包含异常的Flux
Flux.error(new Exception());
//直接创建一个包含异常的Mono
Mono.error(new Exception());
Mono.just("mono")
    //连接一个包含异常的Mono
    .concatWith(Mono.error(new Exception("myExc")))
    //异常监听
    .doOnError(error -> System.out.println("错误: "+ error))
    //在发生异常时将其入参传递给订阅者
    .onErrorReturn("-excRet")
    .subscribe(System.out::println);
/*最终输出: 
mono
错误: java.lang.Exception: myExc
-excRet
*/

3. 常用方法

3.1. 使用concatWith合并及concatWithValues追加

//合并多个Mono为一个Flux
Mono.just("Mono1").concatWith(Mono.just("---Mono2"))
    .subscribe(System.out::print);

//连接多个Flux
Flux.just("连接")
    //连接两个Flux
    .concatWith(Flux.just("两个"))
    //将元素追加到Flux
    .concatWithValues("或追加")
    .subscribe(System.out::print);

3.2. 使用zipWith组合为元素

// 结合为元祖,两个取其端的那个,长的那个多余的被舍弃
Flux<String> s1 = Flux.just("s1-0", "s1-1","s1-2");
Flux<String> s2 = Flux.just("s2-0", "s2-1");
s1.zipWith(s2)
    .subscribe(tuple -> System.out.println(tuple.getT1() + " -> " + tuple.getT2()));

3.3. 使用skip跳过元素

Flux.just(1,2,3,4,5)
    //跳过前2两个
    .skip(2)
    //输出: 345
    .subscribe(System.out::print);

3.4. 使用take截取元素

Flux<String> just = Flux.just("截取", "前几个", "元素");
//截取前两个元素组成新的flux,不改变原flux
Flux<String> take = just.take(2);
//输出: 截取前几个
take.subscribe(System.out::print);
System.out.println("\n=====");
//输出: 截取前几个元素
just.subscribe(System.out::print);

3.5. 使用filter过滤元素

Flux.just(1,2,3,4,5,6,7,8,9)
    //过滤偶数
    .filter(i->i%2==0)
    //输出: 2468
    .subscribe(System.out::print);

3.6. 使用distinct去重元素

//默认去重
Flux.just(1,1,2,2,3,3)
    //去重
    .distinct()
    //输出: 123
    .subscribe(System.out::print);
//将要去重的自定义的类
class MyClass{
    public int key;
    public String val;
    MyClass(int k, String v){
        key=k;val=v;
    }
    public String toString(){
        return String.format("{%d, %s} ",key,val);
    }
}
Flux.just(new MyClass(1,"asd"),new MyClass(1,"asdf"),new MyClass(2,"asd"))
    //自定义对象的比较键(参与比较的字段)
    .distinct(s->s.key)
    //输出: {1, asd} {2, asd}
    .subscribe(System.out::print);

3.7. 延迟执行(异步)

Flux.just("这是","延迟","执行")
    //在一秒后输出: 这是延迟执行
    .delayElements(Duration.ofSeconds(1)).subscribe(System.out::print);
Thread.sleep(1100);

3.8. 从Flux获取首个元素

Flux<String> just = Flux.just("这是", "next", "执行");
//获取第一个元素为Mono,原Flux中的元素不变
Mono<String> next = just.next();
//输出: 这是
next.subscribe(System.out::println);
System.out.println("=========");
//输出: 这是next执行
just.subscribe(System.out::print);

3.9. 从Flux阻塞式取一个元素

Flux<String> flux = Flux.create(skin -> {
    for(int i=0;i<2;++i){
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        skin.next("这是第"+i+"个元素");
    }
    skin.complete();
});
//flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据
//阻塞式订阅,只要有一个元素进入Flux
String first = flux.blockFirst();
//输出:  这是第0个元素
System.out.println(first);
//还是输出: 这是第0个元素
System.out.println(flux.blockFirst());
//输出: 这是第1个元素
System.out.println(flux.blockLast());
//还是输出: 这是第1个元素
System.out.println(flux.blockLast());

3.10. Flux与Mono之间的相互转换

Flux<String> flux = Flux.just("asd","asd");
//自定义收集器转换为Mono
Mono<List<String>> collect = flux.collect(Collectors.toList());
//或使用默认收集器转换为Mono
Mono<List<String>> collect2 = flux.collectList();
//将Mono转换为仅有一个元素的Flux
Flux<List<String>> flux2 = collect.flux();
//将只有一个元素的Flux转换为Mono
Flux.just("1").single().subscribe(System.out::println);

3.11. 最终始终会执行的函数

Flux.just("asd", "qwe").concatWith(Flux.error(new Exception()))
    //当流程完成后的第一件事,由于由异常这里不执行
    .doOnComplete(()-> System.out.println("数据组装完成"))
    .doFinally(t-> System.out.println("最后执行的:"+t))
    .subscribe(System.out::println);
/*最终输出: 
asd
qwe
最后执行的:onError
*/

4. 常用监听

每次创建监听都会返回一个新的Flux对象,监听也在新的Flux对象

4.1. 监听每次消费

Flux<String> flux = Flux.just("asd", "qwe");
//每次消费前做什么,入参是将要消费的元素
flux = flux.doOnNext(s -> System.out.println("当前消费:" + s));
flux.subscribe(System.out::println);
/*输出为: 
当前消费:asd
asd
当前消费:qwe
qwe
*/

4.2. 监听流程完成或错误

//监听正常流程完成
Flux.just("1","2").doOnComplete(()-> System.out.println("ok"))
    .subscribe(System.out::println);
/*最终输出:
1
2
ok
*/

4.3. 监听消费者

//消费者参与前的最后一件事,入参为消费者对象
Flux.just("1","2").doOnSubscribe(System.out::println).subscribe(System.out::println);
/*最终输出:
reactor.core.publisher.FluxArray$ArraySubscription@66d18979
1
2
*/

5. 背压简单使用

使用 Subscription::request 主动控制订阅量

5.1 原始的 Subscriber::onNext

//生产者每10毫秒生产一个
Flux.interval(Duration.ofMillis(10))
    //消费者每50毫秒消费一个
    .subscribe(new Subscriber<>() {
        Subscription subscription;
        AtomicInteger count = new AtomicInteger(0);
        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(5);//首先请求5个
            count.set(5);
        }

        @Override
        public void onNext(Long val) {
            System.out.print(" val:"+val);
            try {
                //消费者每100毫秒消费一个
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(count.decrementAndGet()<=0){
                System.out.println(" 搞完,重新请求5个");
                subscription.request(5);
                count.set(5);
            }

        }

        @Override
        public void onError(Throwable throwable) {
        }
        @Override
        public void onComplete() {
        }
    });
Thread.sleep(5000);
/*输出:
 val:0 val:1 val:2 val:3 val:4 搞完,重新请求5个
 val:5 val:6 val:7 val:8 val:9 搞完,重新请求5个
 val:10 val:11 val:12 val:13 val:14 搞完,重新请求5个
 val:15 val:16 val:17 val:18 val:19 搞完,重新请求5个
 val:20 val:21 val:22 val:23 val:24 搞完,重新请求5个
......
*/

5.2. 使用 BaseSubscriber 简化操作

Flux.range(1,5).log().subscribe(new BaseSubscriber<>() {
    private int count = 0;
    private final int limit = 2;
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }
    @Override
    protected void hookOnNext(Integer value) {
        if (++count == limit) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            request(count);
            count = 0;
        }
    }
});
/*日志输出: 
16:42:39.401 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:42:39.404 [main] INFO reactor.Flux.Range.1 - | request(2)
16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(1)
16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(2)
16:42:40.418 [main] INFO reactor.Flux.Range.1 - | request(2)
16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(3)
16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(4)
16:42:41.419 [main] INFO reactor.Flux.Range.1 - | request(2)
16:42:41.419 [main] INFO reactor.Flux.Range.1 - | onNext(5)
16:42:41.420 [main] INFO reactor.Flux.Range.1 - | onComplete()
*/

5.3. 通过 limitRate 进一步简化并链式操作

Flux.interval(Duration.ofMillis(100)).take(5)
    .log()
    //每次取2个
    .limitRate(2)
    .subscribe();
Thread.sleep(1000);
/*日志输出:
16:46:30.627 [main] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
16:46:30.630 [main] INFO reactor.Flux.Take.1 - request(2)
16:46:30.747 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0)
16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - onNext(1)
16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - request(2)
16:46:30.936 [parallel-1] INFO reactor.Flux.Take.1 - onNext(2)
16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - onNext(3)
16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - request(2)
16:46:31.143 [parallel-1] INFO reactor.Flux.Take.1 - onNext(4)
16:46:31.144 [parallel-1] INFO reactor.Flux.Take.1 - onComplete()
*/

二. Spring中webflux的应用

1. 传统Controller的方式应用

后端 JAVA 代码

package com.example.wefluxdemo.web;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.HashMap;

@RestController
@RequestMapping("/hello")
public class TestHelloController {

    @GetMapping("/1")
    public Mono<HashMap> hello1(){
        return Mono.<HashMap>create(sink -> {
           var map = new HashMap<String,String>();
           map.put("haha","hehe");
           map.put("wuwu","yingying");
           sink.success(map);
        });
    }

    @GetMapping("/2")
    public Mono<String> hello2(String s){
        return Mono.just(String.format("{\"s\":\"%s\"}",s));
    }
    
    @GetMapping("/3/{s}")
    public Mono<HashMap> hello3(@PathVariable String s){
        var map = new HashMap<String,String>();
        map.put("txt",s);
        return Mono.just(map);
    }
}

前端 js 调用服务代码

async function get(url){
    const resp = await fetch(url)
    if(resp.ok){
        console.log(await resp.json())
    }
}
//测试请求
get("/hello/1");//{"haha":"hehe","wuwu":"yingying"}
get("/hello/2?s=qwe");//{"s":"qwe"}
get("/hello/3/asd");//{"txt":"asd"}

2. Router和Handler的方式应用

后端 JAVA 代码

package com.example.wefluxdemo.web;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;
import java.util.HashMap;

@Configuration
public class TestRouterAndHandler {

    //处理方法正常应该和路由方法分在不同的类中
    public Mono<ServerResponse> test1(ServerRequest request){
        var map = new HashMap<String,Object>();
        map.put("返回值","Mono<ServerResponse>");
        map.put("形参","约束必须为ServerRequest");
        map.put("获取查询参数s",request.queryParam("s").get());
        //输出: 当前的SessionId:4fa0eea8-3f44-4e85-b501-2c05970876c2
        request.session().subscribe(s-> System.out.println("当前的SessionId:"+s.getId()));
        return ServerResponse.ok().bodyValue(map);
    }

    @Bean
    public RouterFunction<ServerResponse> test1Router(){
        //路由/test/1的处理方法为test1
        return RouterFunctions.route(RequestPredicates.GET("/test/1"),this::test1);
    }
}

前端 js 调用服务代码

get("/test/1?s=zxc");
/*控制台输出:
{
	"返回值": "Mono<ServerResponse>",
	"获取查询参数s": "zxc",
	"形参": "约束必须为ServerRequest"
}
*/
展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部
返回顶部
顶部