从Flux.first开始
-
正常数据流
first
表示某个publisher先执行触发信号(signal)的方法则之后仅推送此publisher中的信号,其他publisher失效。信号——signal是reactor官文常提到的概念,指通过触发的方法(例如onNext/onError/onComplete)向后续的订阅者发布消息。Flux<String> flux = Flux.first(p->{ p.onNext("A-1"); new Thread(()->{ sleep1(); //暂停1秒 p.onNext("A-2"); p.onComplete(); }).start(); }, p->{ p.onNext("B-1"); p.onNext("B-2"); p.onComplete(); }); return flux.map(x -> { System.out.println(x); return x; }); /*输出 A-1 A-2 */
上面的代码可以将
p.onNext("A-1");
移至异步Thead内,会输出B-1|B-2
。若一直不调用
onComplet
e方法就是一个无限数据流,这意味着stream永远不会关闭,可以持续不断的发送信息(比如用于发布订阅)。下面的代码展示了Flux的一些用法——使用多种Flux创建方法实现客户端与WebFlux创建
Websocket
链接后,1)当客户端发来消息时记录接收到的消息并回复"Receive Msg:" + 收到的消息
,2)每间隔5秒向客户端推送一条Send Msg: Push x Time.
//在@Configuration中添加 private WebSocketHandler socketHandler = session -> { Flux<WebSocketMessage> receive = session.receive().map(msg -> { String info = msg.getPayloadAsText(); log.info("Receive Msg:" + info); return info; }).map(info -> session.textMessage("Receive Msg:" + info)); Flux<WebSocketMessage> push = Flux.interval(Duration.ofSeconds(5)).map( i->session.textMessage("Send Msg: Push" + ++i + " Time.")); Flux<WebSocketMessage> echo = Flux.merge(receive, push) .onErrorResume(e -> Flux.just(session.textMessage("ERROR:" + e.getMessage()))); return session.send(echo); }; @Bean public HandlerMapping webSocketMapping() { final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setOrder(Ordered.HIGHEST_PRECEDENCE); mapping.setUrlMap( ImmutableMap.<String, WebSocketHandler>builder().put("/api/test/socket", socketHandler).build()); return mapping; } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); }
外部访问
"/api/test/socket"
路径发起websocket链接,此时 session 会创建一个Flux的publisher,其后的map用于处理发送来的数据。如果没有其他的外部依赖,WebSocketSession
的实现类仅有ReactorNettyWebSocketSession
,基本上还是按照在DispatcherHandler
中找到对应的handlerAdapter
来处理,因此在Configuration
中注入了WebSocketHandlerAdapter
告知其handlerAdapter
对应的处理类。上面的代码用到了多个Flux的创建方法(interval、merge、just),所有的创建方法都有其作用和特性以实现一些需求,后文将会逐一介绍。
-
异常处理
在使用
first
创建时,其中抛出的异常并不会由onErrorResume
等异常捕捉处理到。如下代码会输出Outer Exception.ERRPR
。try { Flux<String> flux = Flux.<String>first(p->{ System.out.println("publisher1 Execute"); p.onNext("1"); //p.onError(new RuntimeException("ERRPR")); throw new RuntimeException("ERRPR"); //p.onComplete(); }); flux.map(x->{ System.out.println("Consumer," + x); //throw new RuntimeException("ERRPR"); return x + "x"; }).onErrorResume(err->{ return Flux.just("123"); }).toStream().forEach(x->{ System.out.println("Each Print," + x); System.out.println(x); }); }catch(Exception e) { System.out.println("Outer Exception." + e.getMessage()); }
使用WebFlux时如果没主动捕捉
publisher
代码块的异常会导致请求永不返回,需要拦截异常之后传递给onError
去处理,这和在map
等方法中抛出异常有较大的区别。Flux<String> flux = Flux.<String>first(p -> { try { System.out.println("Publisher1"); p.onNext("1"); if(true)throw new RuntimeException("ERRPR"); p.onComplete(); }catch(Exception e) { p.onError(e); } })
每一种Flux的创建方法的异常流都有些许的不同,复杂的代码需小心处理。
Flux.just
just
方法是创建Flux最简单直观的方法。将一个实例包装后向后传递Flux对象。如下代码实际上就相当于一个publisher中调用了3次onNext
,然后onComplete
。
Flux.just("1", "2", "3").map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).toStream().forEach(x -> {
System.out.println("Result:" + x);
});
/* print
Comsumer:1
Comsumer:2
Comsumer:3
Result:1
Result:2
Result:3
*/
Flux.empty以及Flux.never
Flux.empty
会创建一个空对象,此对象创建后即刻返回,不会向后执行任何动作。Flux.never
会创建一个不可执行的空对象,但是不会返回,这就意味如果是从main启动会类似一个守护线程启动,如果是webFlux会占用线程,(类似用publisher中不调用onComplete)
Flux.combineLatest
combineLatest
有很多个重载方法。所有的方法均表示为当所有的publisher执行完毕之后,再调用一个combinator用于组合结果。
Flux<String> flux = Flux.<String, String>combineLatest(list -> {
String out = "";
for(Object i : list) {
out += i + "|";
}
return out + ";";
}, emt -> {
System.out.println("A Thread:" + Thread.currentThread().getName());
emt.onNext("A-1");
new Thread(()->{
sleep1(); //停顿1秒
System.out.println("A Async Thread:" + Thread.currentThread().getName());
emt.onNext("A-2");
}).start();
new Thread(()->{
sleep4(); //停顿4秒
System.out.println("A Async Thread:" + Thread.currentThread().getName());
emt.onNext("A-3");
emt.onComplete();
}).start();
}, emt -> {
System.out.println("B Thread:" + Thread.currentThread().getName());
emt.onNext("B-1");
emt.onNext("B-2");
emt.onNext("B-3");
emt.onComplete();
}, emt -> {
System.out.println("C Thread:" + Thread.currentThread().getName());
emt.onNext("C-1");
emt.onNext("C-2");
new Thread(()->{
sleep2();//停顿2秒
System.out.println("C Async Thread:" + Thread.currentThread().getName());
emt.onNext("C-3");
emt.onComplete();
}).start();
});
flux.map(x -> {
System.out.println("Consumer Thread:" + Thread.currentThread().getName() + ".Val:" + x);
return x;
});
/*
A Thread:main
B Thread:main
C Thread:main
Consumer Thread:main.Val:A-1|B-3|C-1|;
Consumer Thread:main.Val:A-1|B-3|C-2|;
A Async Thread:Thread-0
Consumer Thread:Thread-0.Val:A-2|B-3|C-2|;
C Async Thread:Thread-2
Consumer Thread:Thread-2.Val:A-2|B-3|C-3|;
A Async Thread:Thread-1
Consumer Thread:Thread-1.Val:A-3|B-3|C-3|;
*/
combineLatest
的合并发布过程有点类似我们做代码基线管理——以最后一个publiser为”基线“(实例代码中的C),其他publiser发布的信号都以最后一次的数据向以线程为单位向"基线"合并然后发布数据。上面的代码执过程如下(注意异步线程名):
- A分支推送
A-1
,B分支推送B-1
、B-2
、B-3
,此时A的数据为A-1
、B的数据为B-3
。随后main线程执行C的推送C-1
触发合并方法执行向订阅者推送A-1|B-3|C-1|
。 - 紧接着main线程推送
C-2
,触发合并推送A-1|B-3|C-2|
。 - 等待1秒后A在Thread-0线程推送
A-2
,触发合并向订阅者推送A-2|B-3|C-2|
。 - 1秒后Thread-2线程推送
C-3
,触发合并推送A-2|B-3|C-3|
。 - 2秒后Thread-1推送
A-3
,合并推送A-2|B-3|C-3|
。
Flux.concat
concat
方法是会将所有的publisher都执行完毕,并且每个onNext都是一个独立的信号(signal)向后异步传递信息。
Flux<String> flux = Flux.concat(
emt -> {
emt.onNext("A-1");
emt.onNext("A-2");
emt.onNext("A-3");
emt.onComplete();
}, emt -> {
emt.onNext("B-1");
emt.onNext("B-2");
emt.onNext("B-3");
emt.onComplete();
}, emt -> {
emt.onNext("C-1");
emt.onNext("C-2");
emt.onNext("C-3");
emt.onComplete();
});
flux.map(x -> {
System.out.println("Val:" + x);
return x;
});
/* print
Val:A-1
Val:A-2
Val:A-3
Val:B-1
Val:B-2
Val:B-3
Val:C-1
Val:C-2
Val:C-3
*/
Flux.concatDelayError
concatDelayError
和concat
的差异主要是在对错误的处理。concat
方法在出现错误时,会中断后续的所有处理:
Flux.concat(s->{
s.onNext("1");
s.onError(new RuntimeException("ERROR"));
s.onNext("1+");
s.onComplete();
}, s->{
s.onNext("2");
s.onComplete();
}).map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).onErrorResume(e->{
return Flux.just(e.getMessage());
}).toStream().forEach(x -> {
System.out.println("Result:" + x);
});
/* print
Comsumer:1
Result:1
Result:ERROR
*/
而concatDelayError
可以将onError
视作一个独立的信号,并不会中断后续处理。
Flux.<String>concatDelayError(s->{
s.onNext("1");
s.onError(new RuntimeException("ERROR"));
s.onNext("1+");
s.onComplete();
}, s->{
s.onNext("2");
s.onComplete();
}).map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).onErrorResume(e->{
return Flux.just(e.getMessage());
}).doOnNext(x->{
System.out.println("Result:" + x);
});
/* print
Comsumer:1
Result:1
Comsumer:1+
Result:1+
Comsumer:2
Result:2
Result:ERROR
*/
看输出的日志,即使中间有onError
也会执行后续代码,并在最后附上错误信息。这可用于处理那些允许部分失败的业务。需要注意的是,当出现多个异常时,并不会以多个信号的方式发送,而是合并放在Throwable的引用异常列表中。
return Flux.<String>concatDelayError(s->{
s.onNext("1");
s.onError(new RuntimeException("ERROR1"));
s.onNext("1+");
s.onComplete();
}, s->{
s.onNext("2");
s.onError(new RuntimeException("ERROR2"));
s.onNext("2+");
s.onComplete();
}).map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).onErrorResume(e->reactor.core.Exceptions.isMultiple(e), e->{
String err = "";
for(Throwable error : e.getSuppressed()) {
err += "|" + error.getMessage();
}
return Flux.just(err);
}).onErrorResume(e->{
return Flux.just(e.getMessage());
}).doOnNext(x->{
System.out.println("Result:" + x);
});
/* print
Comsumer:1
Result:1
Comsumer:1+
Result:1+
Comsumer:2
Result:2
Comsumer:2+
Result:2+
Result:|ERROR1|ERROR2
*/
Flux.create及Flux.push
-
相同之处。
create
和push
在方法体上一模一样,类似于用一个发布者重复发送信号直到结束,遇到错误时立刻中断。Flux.<String>create/*push*/(emiter->{ emiter.next("1"); emiter.next("1+"); emiter.complete(); }).map(x -> { System.out.println("Comsumer:" + x); return x; }).doOnNext(x->{ System.out.println("Result:" + x); }); /* print Comsumer:1 Result:1 Comsumer:1+ Result:1+ */
create
和push
都有一个另外一个重载方法接收OverflowStrategy
参数,此参数用于单线程的发布订阅模式的流量处理。默认是OverflowStrategy.BUFFER
表示无法处理则先缓存。细节可了解backpressure mode
(背压模式)。 -
不同之处
create
和push
主要的差别在异步线程处理上,简单粗暴的理解——create
相当于一个线程安全的过程、push
相当于一个非线程安全的过程。当然这样说并不准确,官方的文档也只是说create
适合“多值的异步API”(This Flux factory is useful if one wants to adapt some other multi-valued async API)push
适合“单线程多值的异步API”(This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API),并且2者实现的实现都是用了FluxCreate。下面的代码比较复杂——模拟了2个创建方法在多线程下的情况(如果接收到所有的信号会在map方法中输出到Result6
,如果没有接收到所有的信号那么在输出Result6
之前就结束了)。有兴趣可以把代码放到main中分别使用
create
、push
多跑几次看看输出有何不同(最后加个.toStream()
之类的方法汇运行),甚至去研究FluxCreate在接收FluxCreate.CreateMode.PUSH_PULL
和FluxCreate.CreateMode.PUSH_ONLY
不同的处理过程。如果只是使用Flux基本上只要记住一个是线程安全、一个非线程安全就行了。void sleep1() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }
DateFormat formater = new SimpleDateFormat("HH:mm:ss"); AtomicBoolean b1 = new AtomicBoolean(false); AtomicBoolean b2 = new AtomicBoolean(false); AtomicBoolean b3 = new AtomicBoolean(false); AtomicInteger i = new AtomicInteger(1); Flux<String> flux = Flux.create/*push*/(p->{ new Thread(()->{ System.out.println("Thread A Before. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date())); sleep1(); System.out.println("Publish A-1.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); p.next("A-1"); sleep1(); System.out.println("Publish A-2.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); p.next("A-2"); System.out.println("Thread A After."); b1.set(true); if(b1.get() && b2.get() && b3.get()) { System.out.println("Thread A Complete"); p.complete(); } System.out.println("Thread A End."); }).start(); new Thread(()->{ System.out.println("Thread B Before. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date())); sleep1(); System.out.println("Publish B-1.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); p.next("B-1"); sleep1(); System.out.println("Publish B-2.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); p.next("B-2"); System.out.println("Thread B After."); b2.set(true); if(b1.get() && b2.get() && b3.get()) { System.out.println("Thread B Complete"); p.complete(); } System.out.println("Thread B End."); }).start(); new Thread(()->{ System.out.println("Thread C Before. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date())); sleep1(); System.out.println("Publish C-1.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); p.next("C-1"); sleep1(); System.out.println("Publish C-2.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); p.next("C-2"); System.out.println("Thread C After."); b3.set(true); if(b1.get() && b2.get() && b3.get()) { System.out.println("Thread C Complete"); p.complete(); } System.out.println("Thread CEnd."); }).start(); System.out.println("Thread Publisher. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date())); }); flux.map(x -> { System.out.println("Result" + i.getAndAdd(1) + ". Val=" + x + ",Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date())); return x; });
Flux.defer
defer
表示延迟执行publisher。方法体通过Supplier
包装推迟publiser的执行。
Publisher<String> publisher = Flux.create(emt->{
System.out.println("Publisher:" + 1);
emt.next("1");
System.out.println("Publisher:" + 2);
emt.next("2");
});
return Flux.defer(()->{
System.out.println("Create Publisher");
return publisher;
}).map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).doOnNext(x->{
System.out.println("Result:" + x);
});
Flux.from
Publisher<String> publisher = Flux.create(emt->{
System.out.println("Publisher:" + 1);
emt.next("1");
System.out.println("Publisher:" + 2);
emt.next("2");
});
Flux.from(publisher).map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).doOnNext(x->{
System.out.println("Result:" + x);
});
直观上,defer
和from
非常类似,不过from
的publisher会先于Flux.from方法体执行。
Flux.fromArray
fromArray
就是直接使用队列创建一个Flux,Flux.fromArray(new String[] {"1","2"})
类似于Flux.just("1","2")
。
Flux.fromIterable
fromIterable
接收一个Iterable
接口的队列,Flux.fromIterable(Arrays.asList("1", "2"))
类似于Flux.just("1","2")
。
Flux.fromStream
fromStream
接收一个Stream
的流数据。Flux.fromStream(Stream.<String>builder().add("1").add("2").build())
类似于Flux.just("1","2")
。fromStream有一个使用Supplier
包装的重载,区别相当于Flux.defer
和Flux.from
。
Flux.error
直接抛出一个异常。
Flux.<String>error(new RuntimeException("ERROR")).map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).onErrorResume(e -> Flux.just(e.getMessage())).doOnNext(x -> {
System.out.println("Result:" + x);
});
Flux.generate
generate
是一个比较特殊的构建方法,用中文描述应该叫发生器比较贴切。他类似一个FOR循环,在触发onComplete之前他会不断的发布数据给后续的订阅者。Flux
的generate
有三个重载方法,比较直观好用的是generate(beforePublish, publisher,afterPublish)
,另外两个大同小异。
Flux<String> generate = Flux.<Integer, Integer>generate(()->{
return 0;
}, (pre, emt)->{
if(10 < ++pre) { //累计执行一定次数后关闭/退出发生器
emt.complete();
}else {
emt.next(pre);
}
return pre;
}, state->{
System.out.println("state:" + state);
}).map(x->"" + x);
generate.map(x -> {
System.out.println("Comsumer:" + x);
return x;
}).onErrorResume(e -> Flux.just(e.getMessage())).doOnNext(x -> {
System.out.println("Result:" + x);
});
/* 输出
Comsumer:1
Result:1
Comsumer:2
Result:2
Comsumer:3
Result:3
Comsumer:4
Result:4
Comsumer:5
Result:5
Comsumer:6
Result:6
Comsumer:7
Result:7
Comsumer:8
Result:8
Comsumer:9
Result:9
Comsumer:10
Result:10
state:11
*/
上面这段代码generate 会不断的产生数据向后推送,直到达成if(10 < ++pre)
条件退出。与create相比generate 有个优点是他会等到每一个信号执行完毕之后才产生下一条数据并继续执行。但是缺点也明显当处理链路过长时,任何未仔细处理的异常或阻塞会导致整个执行过程卡死。publisher中无论时throw
还是emt.error
导致的异常,都会正确被onErrorResume
处理。
Flux.interval
interval
可简单理解为一个"定时任务"。通过Duration
指定执行间隔,然后按间隔时间的向订阅者发布信号。
Duration duration = Duration.ofSeconds(1);
DateFormat formater = new SimpleDateFormat("mm:ss");
return Flux.interval(duration).map(step -> {
System.out.println("Step:" + step);
return new Date();
}).map(date -> formater.format(date))
.onErrorResume(e -> Flux.just(e.getMessage()))
.doOnNext(x -> System.out.println("time:" + x));
/* 输出
Step:0
time:10:16
Step:1
time:10:17
Step:2
time:10:18
Step:3
time:10:19
Step:4
time:10:20
...
*/
只要不是主动退出或在订阅链路中抛出异常,此订阅会一直按时间间隔执行下去。与generate
差别在于interval
通过Scheduler接口可以指定后续任务的线程模型。这也是Flux响应式EventLop模型的核心。默认情况下interval
会使用parallel类型的线程,可以使用额外提供Scheduler指示后续使用其他线程类型来工作。下面的代码将线程模型指定为elastic。
Duration duration = Duration.ofSeconds(1);
DateFormat formater = new SimpleDateFormat("HH:mm:ss");
Scheduler scheduler = Schedulers.newElastic("My-Elastic");
scheduler.schedule(()->{
System.out.println("Schedule=========================");
System.out.println("Thread:" + Thread.currentThread().getName());
System.out.println("Time:" + formater.format(new Date()));
System.out.println(" ");
});
return Flux.interval(duration, scheduler).map(step -> {
System.out.println("Step=========================");
System.out.println("Thread:" + Thread.currentThread().getName());
System.out.println("step:" + step);
System.out.println(" ");
return new Date();
}).map(date -> formater.format(date)).onErrorResume(e -> {
return Flux.just(e.getMessage());
}).doOnNext(x -> {
System.out.println("Consumer=========================");
System.out.println("Thread:" + Thread.currentThread().getName());
System.out.println("Time:" + x);
System.out.println(" ");
});
/* 输出
Schedule=========================
Thread:My-Elastic-2
Time:11:08:28
Step=========================
Thread:My-Elastic-2
step:0
Consumer=========================
Thread:My-Elastic-2
Time:11:08:29
Step=========================
Thread:My-Elastic-2
step:1
Consumer=========================
Thread:My-Elastic-2
Time:11:08:30
Step=========================
Thread:My-Elastic-2
step:2
Consumer=========================
Thread:My-Elastic-2
Time:11:08:31
*/
关于线程模型的创建和选择参考reactor.core.scheduler.Schedulers
。
Flux.merge
merge
的调用方法和concat
类似,但是在运行机制上存在差异。concat
是"一步一步"的执行,要执行完第一个publiser后才会向后执行第二个、第三个。merge
是同时执行,每个onNext
都会立刻引发一个信号。
/*concat*/
Flux<String> flux = Flux.concat(
emt -> {
AtomicBoolean b1 = new AtomicBoolean();
AtomicBoolean b2 = new AtomicBoolean();
new Thread(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
emt.onNext("A-1");
b1.set(true);
if(b1.get() && b2.get()) {
emt.onComplete();
}
}).start();
new Thread(()->{
emt.onNext("A-2");
b2.set(true);
if(b1.get() && b2.get()) {
emt.onComplete();
}
}).start();
}, emt -> {
emt.onNext("B-1");
emt.onNext("B-2");
emt.onComplete();
}, emt -> {
emt.onNext("C-1");
emt.onNext("C-2");
emt.onComplete();
});
flux.map(x -> {
System.out.println("Consumer=========================");
System.out.println("Thread:" + Thread.currentThread().getName());
System.out.println("Time:" + formater.format(new Date()));
System.out.println("Val:" + x);
return x;
});
/* print: A-2输出之后,所有的信号等待A-1 2秒之后才继续执行。
Consumer=========================
Thread:Thread-1
Time:12:35:26
Val:A-2
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:A-1
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:B-1
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:B-2
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:C-1
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:C-2
*/
上面同样的代码,将concat修改为merge方法会输出
Flux<String> flux = Flux.merge(
...
/* print, A-2在其余信号发布2秒之后才出现
Consumer=========================
Thread:main
Time:12:38:23
Val:B-1
Consumer=========================
Thread:main
Time:12:38:23
Val:A-2
Consumer=========================
Thread:main
Time:12:38:23
Val:B-2
Consumer=========================
Thread:main
Time:12:38:23
Val:C-1
Consumer=========================
Thread:main
Time:12:38:23
Val:C-2
Consumer=========================
Thread:Thread-0
Time:12:38:25
Val:A-1
*/
Flux.mergeOrdered
mergeOrdered
方法的使用和concat
、merge
一样。执行过程类似merge
,不过额外增加了执行顺序的判断功能。其排序过程:
- 每一个Publisher的信号(onNext)都是一个先进先出的队列,每个
publisher
先被执行的onNext
数据入列尾。 - 每一个Publisher队首元素相互对比,符合比较条件的pop出队列。比较过程默认是元素实现Comparable接口,选取比较结果"最小"的元素。可以使用重载方法提供一个
Comparator
自行比较。 - 被选取的元素pop后继续回到第一步比较。若某个Publisher的队列为空,但未结束信号源(未onComplete)则会等待,若结束则此Publisher关闭。
DateFormat formater = new SimpleDateFormat("HH:mm:ss");
Flux<String> flux = Flux.<String>mergeOrdered((a, b)->{
System.out.println("Compare:" + a + " and " + b);
return -1;
}, p->{
new Thread(()->{
try {Thread.sleep(2000);} catch (InterruptedException e) {}
p.onNext("A-1");
p.onComplete();
}).start();
p.onNext("A-2");
}, p->{
p.onNext("B-1");
p.onNext("B-2");
p.onComplete();
}, p->{
p.onNext("C-1");
p.onNext("C-2");
p.onComplete();
});
return flux.map(x -> {
System.out.println("Val:" + x);
System.out.println("Time:" + formater.format(new Date()));
System.out.println("Step=========================");
return x;
});
/*print
-- A-2 元素先进入第一个publisher的队列,开始比较。
Compare:A-2 and B-1
Compare:A-2 and C-1
Val:A-2
Time:15:11:27
Step=========================
--线程等待2秒后 A-1 元素进入第一个publisher的队列,开始第二次比较。
Compare:A-1 and B-1
Compare:A-1 and C-1
Val:A-1
Time:15:11:29
Step=========================
Compare:B-1 and C-1
Val:B-1
Time:15:11:29
Step=========================
Compare:B-2 and C-1
Val:B-2
Time:15:11:29
Step=========================
Val:C-1
Time:15:11:29
Step=========================
Val:C-2
Time:15:11:29
Step=========================
*/
Flux.mergeSequential
mergeSequential
表示对输出的结果次序化。其调用结果和concat
非常的类似,但是执行过程却不一样。concat
会逐一执行每一个publisher并按顺序输出结果,而mergeSequential
会并行执行所有的publisher
然后强制编排顺序输出。
Flux<String> flux = Flux.<String>mergeSequential /*concat*/ (
Flux.interval(Duration.ofSeconds(1)).take(5).map(x->"A-" + x.toString()),
Flux.<String>concat(p->{
new Thread(()->{
sleep();
p.onNext("B-1");
p.onComplete();
}).start();
p.onNext("B-2");
}, p->{
p.onNext("C-1");
p.onComplete();
})
);
flux.map(x -> {
System.out.println("Val:" + x);
System.out.println("Time:" + formater.format(new Date()));
System.out.println("Step=========================");
return x;
});
/* mergeSequential concat
============================ ============================
Val:A-0 Val:A-0
Time:16:14:22 Time:16:19:45
Step========================= Step=========================
Val:A-1 Val:A-1
Time:16:14:23 Time:16:19:46
Step========================= Step=========================
Val:A-2 Val:A-2
Time:16:14:24 Time:16:19:47
Step========================= Step=========================
Val:A-3 Val:A-3
Time:16:14:25 Time:16:19:48
Step========================= Step=========================
Val:A-4 Val:A-4
Time:16:14:26 Time:16:19:49
Step========================= Step=========================
Val:B-2 Val:B-2
Time:16:14:26 Time:16:19:49
Step========================= Step=========================
Val:B-1 Val:B-1
Time:16:14:26 Time:16:19:51
未等待 等待2秒
Step========================= Step=========================
Val:C-1 Val:C-1
Time:16:14:26 Time:16:19:51
Step========================= Step=========================
*/
需要注意使用
mergeSequential
时不能直接使用lambda表达式直接推理一个Publisher
的实现类,否则在调用onNext
时会抛出一个innerNext
的空指针异常(reactor-core-3.2.5.RELEASE)。
上面的代码输出结果完全一致,两者仅因为并行模式的区别,在B-1
和B-2
的处理时间上有差别。
Flux.mergeSequentialDelayError
类同concatDelayError
,如果出现异常依然会执行完每一步,异常当作一个信号放在最后输出。多个异常会合并。
Publisher<String> publisher = Flux.concat(p->{
p.onNext("A-1");
p.onNext("A-2");
p.onError(new RuntimeException("ERROR"));
p.onComplete();
}, p->{
p.onNext("B-1");
p.onNext("B-2");
p.onComplete();
});
Flux<String> flux = Flux.<String>mergeSequentialDelayError(
Flux.just(publisher, Flux.just("C-1", "C-2")), 3, 3);
/*
Val:A-1
Time:18:08:46
Step=========================
Val:A-2
Time:18:08:46
Step=========================
Val:C-1
Time:18:08:46
Step=========================
Val:C-2
Time:18:08:46
Step=========================
Val:ERROR
Time:18:08:46
Step=========================
*/
Flux.range
Flux.range(i, x)
从某个数值i开始向后连续发送n次信号,每次信号得到一个整数类型的i++。
Flux<String> flux = Flux.range(0, 10).map(i->"" + i);
flux.map(x -> {
System.out.print("," + x);
return x;
});
/*print
,0,1,2,3,4,5,6,7,8,9
*/
Flux.switchOnNext
顾名思义——将开关(switch)交给下一个发布者(publiser)。switchOnNext
通过发布接口来订阅发布者(Publisher<? extends Publisher<? extends T>>
),所有被订阅的发布者都可以发布信号(next/onNext),但是仅最后一个发布者可关闭信号。下面的代码由于最后的Flux.just("A-1", "A-2")
已经complete,所以publisher中的内容均不会输出(main运行时输出DEBUG reactor.core.publisher.Operators - onNextDropped: B-1
)。
Publisher<String> publisher = Flux.create(p->{
new Thread(()->{
sleep2();
p.next("B-1");
b.set(true);
if(b.get() && c.get()) {
p.complete();
}
}).start();
new Thread(()->{
sleep2();
p.next("C-1");
c.set(true);
if(b.get() && c.get()) {
p.complete();
}
}).start();
});
Flux<String> flux = Flux.<String>switchOnNext(Flux.just(publisher, Flux.just("A-1", "A-2")));
flux.map(x -> {
System.out.println("Result=" + x + ".Thread=" + Thread.currentThread().getName());
return x;
});
Flux.using
using
提供了一个supplier
->publish
->cleanup
机制,就好像访问数据库一样——打开链接->读写数据->关闭链接。
Flux<String> flux = Flux.using(() -> Arrays.asList("1", "2", "3", "4", "5"), range -> p -> {
range.forEach(i -> p.onNext(i));
p.onComplete();
}, range -> {
// cleanup
System.out.println("Cleanup");
}, false);
return flux.map(x -> {
System.out.print(x + ",");
return x;
});
/* 1,2,3,4,5,Cleanup */
using
和generate
参数有些类似,但是运行机制完全不一样:1. using
支持在publisher中执行异步过程,generate
执行异步会抛出异常。2. generate
整个信号过程都是单一线程并维护状态而using
完全是无序无状态的。可执行下列代码了解差异。
Flux<String> generate = Flux.<String, Integer>generate(() -> 1, (i, emt) -> {
System.out.println(i);
final int local = i++;
new Thread(() -> {
emt.next("" + local);
emt.complete();
}).start();
return i;
}, i -> System.out.println("End State=" + i));
Flux<String> using = Flux.using(() -> 1, i -> p -> {
AtomicBoolean b1 = new AtomicBoolean(false);
AtomicBoolean b2 = new AtomicBoolean(false);
new Thread(() -> {
sleep1();
p.onNext("" + (i + 1));
b1.set(true);
if (b1.get() && b2.get()) {
p.onComplete();
}
}).start();
new Thread(() -> {
sleep2();
p.onNext("" + (i + 2));
b2.set(true);
if (b1.get() && b2.get()) {
p.onComplete();
}
}).start();
}, i -> System.out.println("End State=" + i));
上面用到
using
的代码在WebFlux上下文环境下会抛出Unexpected item
以及406 NOT_ACCEPTABLE
。看源码SpringWebFlux会使用ChannelSendOperator
来触发writeBody的动作,在ChannelSendOperator
会处理2个状态NEW|READY_TO_WRITE
,若使用lambda推断的Publisher少了一些中间处理环境一直将状态设置为FIRST_SIGNAL_RECEIVED
并触发异常。此问题暂未找到官方的说明,若需解决要用另外一个Reactor包中的publisher来包装,如下:
Flux<String> flux = Flux.using(() -> Arrays.asList("1", "2", "3", "4", "5"), range -> Flux.fromIterable(range),
range -> System.out.println("Cleanup"));
flux.map(x -> {
System.out.print(x + ",");
return x;
});
Flux.usingWhen
usingWhen
基本上上using
类似,但是他的数据初始化接口使用的是Publiser
接口可用于更多的异步操作(可接收Flux\Mono),另外他更加细化了各种状态的控制接口(Complete\Error\Cancel),也有重载方法把所有的结果放到一处处理。下面的代码模拟展示了将一个字符串用字节流按行推送给订阅者的过程。
byte[] resource = new String("public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier,\r"
+ " Function<? super D,? extends Publisher<? extends T>> resourceClosure,\r"
+ " Function<? super D,? extends Publisher<?>> asyncComplete,\r"
+ " BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError,\r"
+ " Function<? super D,? extends Publisher<?>> asyncCancel)").getBytes(Charset.forName("UTF-8"));
BiFunction<Integer, byte[], byte[]> aloc = (len, buf) -> {
byte[] loc = new byte[len];
System.arraycopy(buf, 0, loc, 0, len);
return loc;
};
Flux<byte[]> flux = Flux.<byte[], InputStream>usingWhen(Flux.just(new ByteArrayInputStream(resource)),
input -> p -> {
byte[] buffer = new byte[256];
int offset = 0;
int bit = -1;
try {
while (0 < (bit = input.read())) {
if (13 == bit) { // \r=13
p.onNext(aloc.apply(offset, buffer));
offset = 0;
} else {
buffer[offset++] = (byte) bit;
}
}
} catch (IOException e) {
p.onError(e);
}
p.onNext(aloc.apply(offset, buffer));
p.onComplete();
}, input -> p -> { //关闭流
try {input.close();} catch (IOException e) {p.onError(e);}
p.onComplete();
});
flux.map(x -> {
String line = new String(x, Charset.forName("UTF-8"));
System.out.println(line);
return line;
});
和
using
一样,usingWhen
在WebFlux上下文环境也会遇到Unexpected item
的问题。需要使用reactor中的puhlisher包装。例如将Flux<byte[]> flux
修改为:
Flux<byte[]> flux = Flux.<byte[], InputStream>usingWhen(Flux.just(new ByteArrayInputStream(resource)),
input -> Flux.create(emt->{
byte[] buffer = new byte[256];
int offset = 0;
int bit = -1;
try {
while (0 < (bit = input.read())) {
if (13 == bit) {
emt.next(aloc.apply(offset, buffer));
offset = 0;
} else {
buffer[offset++] = (byte) bit;
}
}
} catch (IOException e) {
emt.error(e);
}
emt.next(aloc.apply(offset, buffer));
emt.complete();
}), input -> p -> {
try {input.close();} catch (IOException e) {p.onError(e);}
p.onComplete();
});
Flux.zip
zip
的作用是将多个publisher
的信号合并(同步)为一个信号,多个publisher
发布的信号需要一致,当发布次数不一致时,以少次数的为准——多个publiser
在不同时间发布时zip都会对齐,其获取对齐数据的方式类似mergeOrdered
的先进先出队列。若某个publiser队列为空且已结束信号则其他publiser的信号全部cancel。如下列代码仅输出四行数据。
Publisher<String> zip1 = Flux.<String>merge(p -> {
new Thread(()->{
sleep2();
p.onNext("A");
p.onComplete();
}).start();
p.onNext("B");
}, p -> {
p.onNext("C");
p.onNext("D");
p.onNext("E");
p.onComplete();
});
Publisher<Integer> zip2 = Flux.just(1,2,3,4,5,6);
Publisher<String> zip3 = Flux.just("+","-","*","/");
Flux<String> flux = Flux.<String, Integer, String>zip(zip1, zip2, zip3).map(t -> {
return t.getT1() + "-" + t.getT2() + "-" + t.getT3();
});
flux.map(x -> {
System.out.println(x);
return x;
});
/*
B-1-+
C-2--
D-3-*
E-4-/
*/
每个publiser都有其发布信号的特性,若将上面的merge换成concat,输出会变成B-1-+A-2--C-3-*D-4-/
——因为concat要等第一个publiser执行完毕才会往下执行剩余的发布者。zip有很多个重载方法可以合并2~n个publiser
。
文中代码运行
Reactor的Mono、Flux在WebFlux上下文环境中运行时与使用main方法或Junit线程运行有一些区别。在WebFlux中会根据不同的上下文来决定最后的数据获取方式。文中的代码均在WebFlux环境和main方法下运行过,不同之处会加以说明。main方法使用toStream阻塞获取数据。