Flux的使用-创建篇

原创
06/13 11:06
阅读数 88

从Flux.first开始

  • 正常数据流

    first表示某个publisher先执行触发信号(signal)的方法则之后仅推送此publisher中的信号,其他publisher失效。信号——signal是reactor官文常提到的概念,指通过触发的方法(例如onNext/onError/onComplete)向后续的订阅者发布消息。

    	Flux<String> flux = Flux.first(p->{
    		p.onNext("A-1");
    		new Thread(()->{
    			sleep1(); //暂停1秒
    			p.onNext("A-2");
    			p.onComplete();
    		}).start();
    	}, p->{
    		p.onNext("B-1");
    		p.onNext("B-2");
    		p.onComplete();
    	});
    	return flux.map(x -> {
    		System.out.println(x);
    		return x;
    	});
    	/*输出
    	A-1
    	A-2
    	*/
    

    上面的代码可以将p.onNext("A-1");移至异步Thead内,会输出B-1|B-2

    若一直不调用onComplete方法就是一个无限数据流,这意味着stream永远不会关闭,可以持续不断的发送信息(比如用于发布订阅)。

    下面的代码展示了Flux的一些用法——使用多种Flux创建方法实现客户端与WebFlux创建Websocket链接后,1)当客户端发来消息时记录接收到的消息并回复"Receive Msg:" + 收到的消息,2)每间隔5秒向客户端推送一条Send Msg: Push x Time.

    	//在@Configuration中添加
    	private WebSocketHandler socketHandler = session -> {
    		Flux<WebSocketMessage> receive = session.receive().map(msg -> {
    			String info = msg.getPayloadAsText();
    			log.info("Receive Msg:" + info);
    			return info;
    		}).map(info -> session.textMessage("Receive Msg:" + info));
    		Flux<WebSocketMessage> push = Flux.interval(Duration.ofSeconds(5)).map(
    				i->session.textMessage("Send Msg: Push" + ++i + " Time."));
    		Flux<WebSocketMessage> echo = Flux.merge(receive, push)
    				.onErrorResume(e -> Flux.just(session.textMessage("ERROR:" + e.getMessage())));
    		return session.send(echo);
    	};
    	@Bean
    	public HandlerMapping webSocketMapping() {
    		final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    		mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
    		mapping.setUrlMap(
    				ImmutableMap.<String, WebSocketHandler>builder().put("/api/test/socket", socketHandler).build());
    		return mapping;
    	}
    	@Bean
    	public WebSocketHandlerAdapter handlerAdapter() {
    		return new WebSocketHandlerAdapter();
    	}
    

    外部访问"/api/test/socket"路径发起websocket链接,此时 session 会创建一个Flux的publisher,其后的map用于处理发送来的数据。如果没有其他的外部依赖,WebSocketSession的实现类仅有ReactorNettyWebSocketSession,基本上还是按照在DispatcherHandler 中找到对应的handlerAdapter来处理,因此在Configuration中注入了WebSocketHandlerAdapter告知其handlerAdapter对应的处理类。

    上面的代码用到了多个Flux的创建方法(interval、merge、just),所有的创建方法都有其作用和特性以实现一些需求,后文将会逐一介绍。

  • 异常处理

    在使用first创建时,其中抛出的异常并不会由onErrorResume等异常捕捉处理到。如下代码会输出Outer Exception.ERRPR

    	try {
        	Flux<String> flux = Flux.<String>first(p->{
        	    System.out.println("publisher1 Execute");
        		p.onNext("1");
        		//p.onError(new RuntimeException("ERRPR"));
        		throw new RuntimeException("ERRPR");
        		//p.onComplete();
        	});
    
        	flux.map(x->{
        		System.out.println("Consumer," + x);
        		//throw new RuntimeException("ERRPR");
        		return x + "x";
        	}).onErrorResume(err->{
        		return Flux.just("123");
        	}).toStream().forEach(x->{
        		System.out.println("Each Print," + x);
        		System.out.println(x);
        	});
    	}catch(Exception e) {
    		System.out.println("Outer Exception." + e.getMessage());
    	}
    

    使用WebFlux时如果没主动捕捉publisher代码块的异常会导致请求永不返回,需要拦截异常之后传递给onError去处理,这和在map等方法中抛出异常有较大的区别。

    	Flux<String> flux = Flux.<String>first(p -> {
    		try {
    			System.out.println("Publisher1");
    			p.onNext("1");
    			if(true)throw new RuntimeException("ERRPR");
    			p.onComplete();
    		}catch(Exception e) {
    			p.onError(e);
    		}
    	})
    

    每一种Flux的创建方法的异常流都有些许的不同,复杂的代码需小心处理。

Flux.just

just方法是创建Flux最简单直观的方法。将一个实例包装后向后传递Flux对象。如下代码实际上就相当于一个publisher中调用了3次onNext,然后onComplete

Flux.just("1", "2", "3").map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).toStream().forEach(x -> {
	System.out.println("Result:" + x);
});
/* print
Comsumer:1
Comsumer:2
Comsumer:3
Result:1
Result:2
Result:3
*/

Flux.empty以及Flux.never

Flux.empty会创建一个空对象,此对象创建后即刻返回,不会向后执行任何动作。Flux.never会创建一个不可执行的空对象,但是不会返回,这就意味如果是从main启动会类似一个守护线程启动,如果是webFlux会占用线程,(类似用publisher中不调用onComplete)

Flux.combineLatest

combineLatest有很多个重载方法。所有的方法均表示为当所有的publisher执行完毕之后,再调用一个combinator用于组合结果。

Flux<String> flux = Flux.<String, String>combineLatest(list -> {
	String out = "";
	for(Object i : list) {
		out += i + "|";
	}
	return out + ";";
},  emt -> {
	System.out.println("A Thread:" + Thread.currentThread().getName());
	emt.onNext("A-1");
	new Thread(()->{
		sleep1(); //停顿1秒
		System.out.println("A Async Thread:" + Thread.currentThread().getName());
		emt.onNext("A-2");
	}).start();
	new Thread(()->{
		sleep4(); //停顿4秒
		System.out.println("A Async Thread:" + Thread.currentThread().getName());
		emt.onNext("A-3");
		emt.onComplete();
	}).start();
}, emt -> {
	System.out.println("B Thread:" + Thread.currentThread().getName());
	emt.onNext("B-1");
	emt.onNext("B-2");
	emt.onNext("B-3");
	emt.onComplete();
}, emt -> {
	System.out.println("C Thread:" + Thread.currentThread().getName());
	emt.onNext("C-1");
	emt.onNext("C-2");
	new Thread(()->{
		sleep2();//停顿2秒
		System.out.println("C Async Thread:" + Thread.currentThread().getName());
		emt.onNext("C-3");
		emt.onComplete();
	}).start();
});
flux.map(x -> {
	System.out.println("Consumer Thread:" + Thread.currentThread().getName() + ".Val:" + x);
	return x;
});
/*
A Thread:main
B Thread:main
C Thread:main
Consumer Thread:main.Val:A-1|B-3|C-1|;
Consumer Thread:main.Val:A-1|B-3|C-2|;
A Async Thread:Thread-0
Consumer Thread:Thread-0.Val:A-2|B-3|C-2|;
C Async Thread:Thread-2
Consumer Thread:Thread-2.Val:A-2|B-3|C-3|;
A Async Thread:Thread-1
Consumer Thread:Thread-1.Val:A-3|B-3|C-3|;
*/

combineLatest的合并发布过程有点类似我们做代码基线管理——以最后一个publiser为”基线“(实例代码中的C),其他publiser发布的信号都以最后一次的数据向以线程为单位向"基线"合并然后发布数据。上面的代码执过程如下(注意异步线程名):

  1. A分支推送A-1,B分支推送B-1B-2B-3,此时A的数据为A-1、B的数据为B-3。随后main线程执行C的推送C-1触发合并方法执行向订阅者推送A-1|B-3|C-1|
  2. 紧接着main线程推送C-2,触发合并推送A-1|B-3|C-2|
  3. 等待1秒后A在Thread-0线程推送A-2,触发合并向订阅者推送A-2|B-3|C-2|
  4. 1秒后Thread-2线程推送C-3,触发合并推送A-2|B-3|C-3|
  5. 2秒后Thread-1推送A-3,合并推送A-2|B-3|C-3|

Flux.concat

concat方法是会将所有的publisher都执行完毕,并且每个onNext都是一个独立的信号(signal)向后异步传递信息。

Flux<String> flux = Flux.concat(
	emt -> {
		emt.onNext("A-1");
		emt.onNext("A-2");
		emt.onNext("A-3");
		emt.onComplete();
	}, emt -> {
		emt.onNext("B-1");
		emt.onNext("B-2");
		emt.onNext("B-3");
		emt.onComplete();
	}, emt -> {
		emt.onNext("C-1");
		emt.onNext("C-2");
		emt.onNext("C-3");
		emt.onComplete();
	});

flux.map(x -> {
	System.out.println("Val:" + x);
	return x;
});
/* print
Val:A-1
Val:A-2
Val:A-3
Val:B-1
Val:B-2
Val:B-3
Val:C-1
Val:C-2
Val:C-3
*/

Flux.concatDelayError

concatDelayErrorconcat的差异主要是在对错误的处理。concat方法在出现错误时,会中断后续的所有处理:

Flux.concat(s->{
	s.onNext("1");
	s.onError(new RuntimeException("ERROR"));
	s.onNext("1+");
	s.onComplete();
}, s->{
	s.onNext("2");
	s.onComplete();
}).map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).onErrorResume(e->{
	return Flux.just(e.getMessage());
}).toStream().forEach(x -> {
	System.out.println("Result:" + x);
});
/* print
Comsumer:1
Result:1
Result:ERROR
*/

concatDelayError可以将onError视作一个独立的信号,并不会中断后续处理。

Flux.<String>concatDelayError(s->{
	s.onNext("1");
	s.onError(new RuntimeException("ERROR"));
	s.onNext("1+");
	s.onComplete();
}, s->{
	s.onNext("2");
	s.onComplete();
}).map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).onErrorResume(e->{
	return Flux.just(e.getMessage());
}).doOnNext(x->{
	System.out.println("Result:" + x);
});
/* print
Comsumer:1
Result:1
Comsumer:1+
Result:1+
Comsumer:2
Result:2
Result:ERROR
*/

看输出的日志,即使中间有onError也会执行后续代码,并在最后附上错误信息。这可用于处理那些允许部分失败的业务。需要注意的是,当出现多个异常时,并不会以多个信号的方式发送,而是合并放在Throwable的引用异常列表中。

return Flux.<String>concatDelayError(s->{
	s.onNext("1");
	s.onError(new RuntimeException("ERROR1"));
	s.onNext("1+");
	s.onComplete();
}, s->{
	s.onNext("2");
	s.onError(new RuntimeException("ERROR2"));
	s.onNext("2+");
	s.onComplete();
}).map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).onErrorResume(e->reactor.core.Exceptions.isMultiple(e), e->{
	String err = "";
	for(Throwable error : e.getSuppressed()) {
		err += "|" + error.getMessage();
	}
	return Flux.just(err);
}).onErrorResume(e->{
	return Flux.just(e.getMessage());
}).doOnNext(x->{
	System.out.println("Result:" + x);
});
/* print
Comsumer:1
Result:1
Comsumer:1+
Result:1+
Comsumer:2
Result:2
Comsumer:2+
Result:2+
Result:|ERROR1|ERROR2
*/

Flux.create及Flux.push

  1. 相同之处。

    createpush在方法体上一模一样,类似于用一个发布者重复发送信号直到结束,遇到错误时立刻中断。

    	Flux.<String>create/*push*/(emiter->{
    		emiter.next("1");
    		emiter.next("1+");
    		emiter.complete();
    	}).map(x -> {
    		System.out.println("Comsumer:" + x);
    		return x;
    	}).doOnNext(x->{
    		System.out.println("Result:" + x);
    	});
    	/* print
    	Comsumer:1
    	Result:1
    	Comsumer:1+
    	Result:1+
    	*/
    

    createpush都有一个另外一个重载方法接收OverflowStrategy参数,此参数用于单线程的发布订阅模式的流量处理。默认是OverflowStrategy.BUFFER表示无法处理则先缓存。细节可了解backpressure mode(背压模式)。

  2. 不同之处

    createpush主要的差别在异步线程处理上,简单粗暴的理解——create相当于一个线程安全的过程、push相当于一个非线程安全的过程。当然这样说并不准确,官方的文档也只是说create适合“多值的异步API”(This Flux factory is useful if one wants to adapt some other multi-valued async API)push适合“单线程多值的异步API”(This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API),并且2者实现的实现都是用了FluxCreate。下面的代码比较复杂——模拟了2个创建方法在多线程下的情况(如果接收到所有的信号会在map方法中输出到Result6,如果没有接收到所有的信号那么在输出Result6之前就结束了)。

    有兴趣可以把代码放到main中分别使用createpush多跑几次看看输出有何不同(最后加个.toStream()之类的方法汇运行),甚至去研究FluxCreate在接收FluxCreate.CreateMode.PUSH_PULLFluxCreate.CreateMode.PUSH_ONLY不同的处理过程。如果只是使用Flux基本上只要记住一个是线程安全、一个非线程安全就行了。

    	void sleep1() {
    		try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    	DateFormat formater = new SimpleDateFormat("HH:mm:ss");
    	AtomicBoolean b1 = new AtomicBoolean(false);
    	AtomicBoolean b2 = new AtomicBoolean(false);
    	AtomicBoolean b3 = new AtomicBoolean(false);
    	AtomicInteger i = new AtomicInteger(1);
    
    	Flux<String> flux = Flux.create/*push*/(p->{
    		new Thread(()->{
    			System.out.println("Thread A Before. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date()));
    			sleep1();
    			System.out.println("Publish A-1.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    			p.next("A-1");
    			sleep1();
    			System.out.println("Publish A-2.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    			p.next("A-2");
    			System.out.println("Thread A After.");
    			b1.set(true);
    			if(b1.get() && b2.get() && b3.get()) {
    				System.out.println("Thread A Complete");
    				p.complete();
    			}
    			System.out.println("Thread A End.");
    		}).start();
    		new Thread(()->{
    			System.out.println("Thread B Before. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date()));
    			sleep1();
    			System.out.println("Publish B-1.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    			p.next("B-1");
    			sleep1();
    			System.out.println("Publish B-2.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    			p.next("B-2");
    			System.out.println("Thread B After.");
    			b2.set(true);
    			if(b1.get() && b2.get() && b3.get()) {
    				System.out.println("Thread B Complete");
    				p.complete();
    			}
    			System.out.println("Thread B End.");
    		}).start();
    		new Thread(()->{
    			System.out.println("Thread C Before. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date()));
    			sleep1();
    			System.out.println("Publish C-1.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    			p.next("C-1");
    			sleep1();
    			System.out.println("Publish C-2.Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    			p.next("C-2");
    			System.out.println("Thread C After.");
    			b3.set(true);
    			if(b1.get() && b2.get() && b3.get()) {
    				System.out.println("Thread C Complete");
    				p.complete();
    			}
    			System.out.println("Thread CEnd.");
    		}).start();
    		System.out.println("Thread Publisher. Thread=" + Thread.currentThread().getName() + ", Time=" + formater.format(new Date()));
    	});
    	flux.map(x -> {
    		System.out.println("Result" + i.getAndAdd(1) + ". Val=" + x + ",Thread=" + Thread.currentThread().getName() + ",Time=" + formater.format(new Date()));
    		return x;
    	});
    

Flux.defer

defer表示延迟执行publisher。方法体通过Supplier包装推迟publiser的执行。

Publisher<String> publisher = Flux.create(emt->{
	System.out.println("Publisher:" + 1);
	emt.next("1");
	System.out.println("Publisher:" + 2);
	emt.next("2");
});
return Flux.defer(()->{
	System.out.println("Create Publisher");
	return publisher;
}).map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).doOnNext(x->{
	System.out.println("Result:" + x);
});

Flux.from

Publisher<String> publisher = Flux.create(emt->{
	System.out.println("Publisher:" + 1);
	emt.next("1");
	System.out.println("Publisher:" + 2);
	emt.next("2");
});
Flux.from(publisher).map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).doOnNext(x->{
	System.out.println("Result:" + x);
});

直观上,deferfrom非常类似,不过from的publisher会先于Flux.from方法体执行。

Flux.fromArray

fromArray就是直接使用队列创建一个Flux,Flux.fromArray(new String[] {"1","2"})类似于Flux.just("1","2")

Flux.fromIterable

fromIterable接收一个Iterable接口的队列,Flux.fromIterable(Arrays.asList("1", "2"))类似于Flux.just("1","2")

Flux.fromStream

fromStream接收一个Stream的流数据。Flux.fromStream(Stream.<String>builder().add("1").add("2").build())类似于Flux.just("1","2")。fromStream有一个使用Supplier包装的重载,区别相当于Flux.deferFlux.from

Flux.error

直接抛出一个异常。

Flux.<String>error(new RuntimeException("ERROR")).map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).onErrorResume(e -> Flux.just(e.getMessage())).doOnNext(x -> {
	System.out.println("Result:" + x);
});

Flux.generate

generate是一个比较特殊的构建方法,用中文描述应该叫发生器比较贴切。他类似一个FOR循环,在触发onComplete之前他会不断的发布数据给后续的订阅者。Fluxgenerate有三个重载方法,比较直观好用的是generate(beforePublish, publisher,afterPublish),另外两个大同小异。

Flux<String> generate = Flux.<Integer, Integer>generate(()->{
	return 0;
}, (pre, emt)->{
	if(10 < ++pre) { //累计执行一定次数后关闭/退出发生器
		emt.complete();
	}else {
		emt.next(pre);
	}
	return pre;
}, state->{
	System.out.println("state:" + state);
}).map(x->"" + x);

generate.map(x -> {
	System.out.println("Comsumer:" + x);
	return x;
}).onErrorResume(e -> Flux.just(e.getMessage())).doOnNext(x -> {
	System.out.println("Result:" + x);
});
/* 输出
Comsumer:1
Result:1
Comsumer:2
Result:2
Comsumer:3
Result:3
Comsumer:4
Result:4
Comsumer:5
Result:5
Comsumer:6
Result:6
Comsumer:7
Result:7
Comsumer:8
Result:8
Comsumer:9
Result:9
Comsumer:10
Result:10
state:11
*/

上面这段代码generate 会不断的产生数据向后推送,直到达成if(10 < ++pre)条件退出。与create相比generate 有个优点是他会等到每一个信号执行完毕之后才产生下一条数据并继续执行。但是缺点也明显当处理链路过长时,任何未仔细处理的异常或阻塞会导致整个执行过程卡死。publisher中无论时throw还是emt.error导致的异常,都会正确被onErrorResume处理。

Flux.interval

interval可简单理解为一个"定时任务"。通过Duration指定执行间隔,然后按间隔时间的向订阅者发布信号。

Duration duration = Duration.ofSeconds(1);
DateFormat formater = new SimpleDateFormat("mm:ss");
return Flux.interval(duration).map(step -> {
	System.out.println("Step:" + step);
	return new Date();
}).map(date -> formater.format(date))
.onErrorResume(e -> Flux.just(e.getMessage()))
.doOnNext(x -> System.out.println("time:" + x));
/* 输出
Step:0
time:10:16
Step:1
time:10:17
Step:2
time:10:18
Step:3
time:10:19
Step:4
time:10:20
...
*/

只要不是主动退出或在订阅链路中抛出异常,此订阅会一直按时间间隔执行下去。与generate差别在于interval通过Scheduler接口可以指定后续任务的线程模型。这也是Flux响应式EventLop模型的核心。默认情况下interval会使用parallel类型的线程,可以使用额外提供Scheduler指示后续使用其他线程类型来工作。下面的代码将线程模型指定为elastic

Duration duration = Duration.ofSeconds(1);
DateFormat formater = new SimpleDateFormat("HH:mm:ss");
Scheduler scheduler = Schedulers.newElastic("My-Elastic");
scheduler.schedule(()->{
	System.out.println("Schedule=========================");
	System.out.println("Thread:" + Thread.currentThread().getName());
	System.out.println("Time:" + formater.format(new Date()));
	System.out.println("   ");
});
return Flux.interval(duration, scheduler).map(step -> {
	System.out.println("Step=========================");
	System.out.println("Thread:" + Thread.currentThread().getName());
	System.out.println("step:" + step);
	System.out.println("   ");
	return new Date();
}).map(date -> formater.format(date)).onErrorResume(e -> {
	return Flux.just(e.getMessage());
}).doOnNext(x -> {
	System.out.println("Consumer=========================");
	System.out.println("Thread:" + Thread.currentThread().getName());
	System.out.println("Time:" + x);
	System.out.println("   ");
});
/* 输出
Schedule=========================
Thread:My-Elastic-2
Time:11:08:28
   
Step=========================
Thread:My-Elastic-2
step:0
   
Consumer=========================
Thread:My-Elastic-2
Time:11:08:29
   
Step=========================
Thread:My-Elastic-2
step:1
   
Consumer=========================
Thread:My-Elastic-2
Time:11:08:30
   
Step=========================
Thread:My-Elastic-2
step:2
   
Consumer=========================
Thread:My-Elastic-2
Time:11:08:31
*/

关于线程模型的创建和选择参考reactor.core.scheduler.Schedulers

Flux.merge

merge的调用方法和concat类似,但是在运行机制上存在差异。concat是"一步一步"的执行,要执行完第一个publiser后才会向后执行第二个、第三个。merge是同时执行,每个onNext都会立刻引发一个信号。

/*concat*/
Flux<String> flux = Flux.concat(
	emt -> {
		AtomicBoolean b1 = new AtomicBoolean();
		AtomicBoolean b2 = new AtomicBoolean();
		new Thread(()->{
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
			}
			emt.onNext("A-1");
			b1.set(true);
			if(b1.get() && b2.get()) {
				emt.onComplete();
			}
		}).start();
		new Thread(()->{
			emt.onNext("A-2");
			b2.set(true);
			if(b1.get() && b2.get()) {
				emt.onComplete();
			}
		}).start();
	}, emt -> {
		emt.onNext("B-1");
		emt.onNext("B-2");
		emt.onComplete();
	}, emt -> {
		emt.onNext("C-1");
		emt.onNext("C-2");
		emt.onComplete();
	});
flux.map(x -> {
	System.out.println("Consumer=========================");
	System.out.println("Thread:" + Thread.currentThread().getName());
	System.out.println("Time:" + formater.format(new Date()));
	System.out.println("Val:" + x);
	return x;
});
/* print: A-2输出之后,所有的信号等待A-1 2秒之后才继续执行。
Consumer=========================
Thread:Thread-1
Time:12:35:26
Val:A-2
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:A-1
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:B-1
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:B-2
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:C-1
Consumer=========================
Thread:Thread-0
Time:12:35:28
Val:C-2
*/

上面同样的代码,将concat修改为merge方法会输出

Flux<String> flux = Flux.merge(
	...
/* print,  A-2在其余信号发布2秒之后才出现
Consumer=========================
Thread:main
Time:12:38:23
Val:B-1
Consumer=========================
Thread:main
Time:12:38:23
Val:A-2
Consumer=========================
Thread:main
Time:12:38:23
Val:B-2
Consumer=========================
Thread:main
Time:12:38:23
Val:C-1
Consumer=========================
Thread:main
Time:12:38:23
Val:C-2
Consumer=========================
Thread:Thread-0
Time:12:38:25
Val:A-1
*/

Flux.mergeOrdered

mergeOrdered方法的使用和concatmerge一样。执行过程类似merge,不过额外增加了执行顺序的判断功能。其排序过程:

  1. 每一个Publisher的信号(onNext)都是一个先进先出的队列,每个publisher先被执行的onNext数据入列尾。
  2. 每一个Publisher队首元素相互对比,符合比较条件的pop出队列。比较过程默认是元素实现Comparable接口,选取比较结果"最小"的元素。可以使用重载方法提供一个Comparator自行比较。
  3. 被选取的元素pop后继续回到第一步比较。若某个Publisher的队列为空,但未结束信号源(未onComplete)则会等待,若结束则此Publisher关闭。
DateFormat formater = new SimpleDateFormat("HH:mm:ss");
Flux<String> flux = Flux.<String>mergeOrdered((a, b)->{
	System.out.println("Compare:" + a + " and " + b);
	return -1;
}, p->{
	new Thread(()->{
		try {Thread.sleep(2000);} catch (InterruptedException e) {}
		p.onNext("A-1");
		p.onComplete();
	}).start();
	p.onNext("A-2");
}, p->{
	p.onNext("B-1");
	p.onNext("B-2");
	p.onComplete();
}, p->{
	p.onNext("C-1");
	p.onNext("C-2");
	p.onComplete();
});
return flux.map(x -> {
	System.out.println("Val:" + x);
	System.out.println("Time:" + formater.format(new Date()));
	System.out.println("Step=========================");
	return x;
});
/*print 
-- A-2 元素先进入第一个publisher的队列,开始比较。
Compare:A-2 and B-1
Compare:A-2 and C-1
Val:A-2
Time:15:11:27 
Step=========================
--线程等待2秒后 A-1 元素进入第一个publisher的队列,开始第二次比较。
Compare:A-1 and B-1
Compare:A-1 and C-1
Val:A-1
Time:15:11:29
Step=========================
Compare:B-1 and C-1
Val:B-1
Time:15:11:29
Step=========================
Compare:B-2 and C-1
Val:B-2
Time:15:11:29
Step=========================
Val:C-1
Time:15:11:29
Step=========================
Val:C-2
Time:15:11:29
Step=========================
*/

Flux.mergeSequential

mergeSequential表示对输出的结果次序化。其调用结果和concat非常的类似,但是执行过程却不一样。concat会逐一执行每一个publisher并按顺序输出结果,而mergeSequential会并行执行所有的publisher然后强制编排顺序输出。

Flux<String> flux = Flux.<String>mergeSequential /*concat*/ (
	Flux.interval(Duration.ofSeconds(1)).take(5).map(x->"A-" + x.toString()), 
	Flux.<String>concat(p->{
		new Thread(()->{
			sleep();
			p.onNext("B-1");
			p.onComplete();
		}).start();
		p.onNext("B-2");
	}, p->{
		p.onNext("C-1");
		p.onComplete();
	})
);

flux.map(x -> {
	System.out.println("Val:" + x);
	System.out.println("Time:" + formater.format(new Date()));
	System.out.println("Step=========================");
	return x;
});
/* mergeSequential			concat
============================		============================ 
Val:A-0					Val:A-0
Time:16:14:22				Time:16:19:45
Step=========================		Step=========================
Val:A-1					Val:A-1
Time:16:14:23				Time:16:19:46
Step=========================		Step=========================
Val:A-2					Val:A-2
Time:16:14:24				Time:16:19:47
Step=========================		Step=========================
Val:A-3					Val:A-3
Time:16:14:25				Time:16:19:48
Step=========================		Step=========================
Val:A-4					Val:A-4
Time:16:14:26				Time:16:19:49
Step=========================		Step=========================
Val:B-2					Val:B-2
Time:16:14:26				Time:16:19:49
Step=========================		Step=========================
Val:B-1					Val:B-1
Time:16:14:26				Time:16:19:51
未等待					等待2秒
Step=========================		Step=========================
Val:C-1					Val:C-1
Time:16:14:26				Time:16:19:51
Step=========================		Step=========================
*/

需要注意使用mergeSequential时不能直接使用lambda表达式直接推理一个Publisher的实现类,否则在调用onNext时会抛出一个innerNext的空指针异常(reactor-core-3.2.5.RELEASE)。

上面的代码输出结果完全一致,两者仅因为并行模式的区别,在B-1B-2的处理时间上有差别。

Flux.mergeSequentialDelayError

类同concatDelayError,如果出现异常依然会执行完每一步,异常当作一个信号放在最后输出。多个异常会合并。

Publisher<String> publisher = Flux.concat(p->{
	p.onNext("A-1");
	p.onNext("A-2");
	p.onError(new RuntimeException("ERROR"));
	p.onComplete();
}, p->{
	p.onNext("B-1");
	p.onNext("B-2");
	p.onComplete();
});
Flux<String> flux = Flux.<String>mergeSequentialDelayError(
	Flux.just(publisher, Flux.just("C-1", "C-2")), 3, 3);
/*
Val:A-1
Time:18:08:46
Step=========================
Val:A-2
Time:18:08:46
Step=========================
Val:C-1
Time:18:08:46
Step=========================
Val:C-2
Time:18:08:46
Step=========================
Val:ERROR
Time:18:08:46
Step=========================
*/

Flux.range

Flux.range(i, x)从某个数值i开始向后连续发送n次信号,每次信号得到一个整数类型的i++。

Flux<String> flux = Flux.range(0, 10).map(i->"" + i);
flux.map(x -> {
	System.out.print(","  + x);
	return x;
});
/*print
,0,1,2,3,4,5,6,7,8,9
*/

Flux.switchOnNext

顾名思义——将开关(switch)交给下一个发布者(publiser)。switchOnNext通过发布接口来订阅发布者(Publisher<? extends Publisher<? extends T>>),所有被订阅的发布者都可以发布信号(next/onNext),但是仅最后一个发布者可关闭信号。下面的代码由于最后的Flux.just("A-1", "A-2")已经complete,所以publisher中的内容均不会输出(main运行时输出DEBUG reactor.core.publisher.Operators - onNextDropped: B-1)。

Publisher<String> publisher = Flux.create(p->{
	new Thread(()->{
		sleep2();
		p.next("B-1");
		b.set(true);
		if(b.get() && c.get()) {
			p.complete();
		}
	}).start();
	new Thread(()->{
		sleep2();
		p.next("C-1");
		c.set(true);
		if(b.get() && c.get()) {
			p.complete();
		}
	}).start();
});

Flux<String> flux = Flux.<String>switchOnNext(Flux.just(publisher, Flux.just("A-1", "A-2")));
flux.map(x -> {
	System.out.println("Result=" + x + ".Thread=" + Thread.currentThread().getName());
	return x;
});

Flux.using

using提供了一个supplier->publish->cleanup机制,就好像访问数据库一样——打开链接->读写数据->关闭链接。

Flux<String> flux = Flux.using(() -> Arrays.asList("1", "2", "3", "4", "5"), range -> p -> {
	range.forEach(i -> p.onNext(i));
	p.onComplete();
}, range -> {
	// cleanup
	System.out.println("Cleanup");
}, false);
return flux.map(x -> {
	System.out.print(x + ",");
	return x;
});
/* 1,2,3,4,5,Cleanup */

usinggenerate参数有些类似,但是运行机制完全不一样:1. using支持在publisher中执行异步过程,generate执行异步会抛出异常。2. generate整个信号过程都是单一线程并维护状态而using完全是无序无状态的。可执行下列代码了解差异。

Flux<String> generate = Flux.<String, Integer>generate(() -> 1, (i, emt) -> {
	System.out.println(i);
	final int local = i++;
	new Thread(() -> {
		emt.next("" + local);
		emt.complete();
	}).start();
	return i;
}, i -> System.out.println("End State=" + i));

Flux<String> using = Flux.using(() -> 1, i -> p -> {
	AtomicBoolean b1 = new AtomicBoolean(false);
	AtomicBoolean b2 = new AtomicBoolean(false);
	new Thread(() -> {
		sleep1();
		p.onNext("" + (i + 1));
		b1.set(true);
		if (b1.get() && b2.get()) {
			p.onComplete();
		}
	}).start();
	new Thread(() -> {
		sleep2();
		p.onNext("" + (i + 2));
		b2.set(true);
		if (b1.get() && b2.get()) {
			p.onComplete();
		}
	}).start();
}, i -> System.out.println("End State=" + i));

上面用到using的代码在WebFlux上下文环境下会抛出Unexpected item以及406 NOT_ACCEPTABLE。看源码SpringWebFlux会使用ChannelSendOperator来触发writeBody的动作,在ChannelSendOperator会处理2个状态NEW|READY_TO_WRITE,若使用lambda推断的Publisher少了一些中间处理环境一直将状态设置为FIRST_SIGNAL_RECEIVED并触发异常。此问题暂未找到官方的说明,若需解决要用另外一个Reactor包中的publisher来包装,如下:

Flux<String> flux = Flux.using(() -> Arrays.asList("1", "2", "3", "4", "5"), range -> Flux.fromIterable(range),
		range -> System.out.println("Cleanup"));
flux.map(x -> {
	System.out.print(x + ",");
	return x;
});

Flux.usingWhen

usingWhen基本上上using类似,但是他的数据初始化接口使用的是Publiser接口可用于更多的异步操作(可接收Flux\Mono),另外他更加细化了各种状态的控制接口(Complete\Error\Cancel),也有重载方法把所有的结果放到一处处理。下面的代码模拟展示了将一个字符串用字节流按行推送给订阅者的过程。

byte[] resource = new String("public static <T,D> Flux<T> usingWhen(Publisher<D> resourceSupplier,\r"
	+ "   Function<? super D,? extends Publisher<? extends T>> resourceClosure,\r"
	+ "   Function<? super D,? extends Publisher<?>> asyncComplete,\r"
	+ "   BiFunction<? super D,? super Throwable,? extends Publisher<?>> asyncError,\r"
	+ "   Function<? super D,? extends Publisher<?>> asyncCancel)").getBytes(Charset.forName("UTF-8"));
BiFunction<Integer, byte[], byte[]> aloc = (len, buf) -> {
	byte[] loc = new byte[len];
	System.arraycopy(buf, 0, loc, 0, len);
	return loc;
};
Flux<byte[]> flux = Flux.<byte[], InputStream>usingWhen(Flux.just(new ByteArrayInputStream(resource)),
	input -> p -> {
		byte[] buffer = new byte[256];
		int offset = 0;
		int bit = -1;
		try {
			while (0 < (bit = input.read())) {
				if (13 == bit) { // \r=13
					p.onNext(aloc.apply(offset, buffer));
					offset = 0;
				} else {
					buffer[offset++] = (byte) bit;
				}
			}
		} catch (IOException e) {
			p.onError(e);
		}
		p.onNext(aloc.apply(offset, buffer));
		p.onComplete();
	}, input -> p -> { //关闭流
		try {input.close();} catch (IOException e) {p.onError(e);}
		p.onComplete();
	});
flux.map(x -> {
	String line = new String(x, Charset.forName("UTF-8"));
	System.out.println(line);
	return line;
});

using一样,usingWhen在WebFlux上下文环境也会遇到Unexpected item的问题。需要使用reactor中的puhlisher包装。例如将Flux<byte[]> flux修改为:

Flux<byte[]> flux = Flux.<byte[], InputStream>usingWhen(Flux.just(new ByteArrayInputStream(resource)),
	input -> Flux.create(emt->{
		byte[] buffer = new byte[256];
		int offset = 0;
		int bit = -1;
		try {
			while (0 < (bit = input.read())) {
				if (13 == bit) {
					emt.next(aloc.apply(offset, buffer));
					offset = 0;
				} else {
					buffer[offset++] = (byte) bit;
				}
			}
		} catch (IOException e) {
			emt.error(e);
		}
		emt.next(aloc.apply(offset, buffer));
		emt.complete();
	}), input -> p -> {
		try {input.close();} catch (IOException e) {p.onError(e);}
		p.onComplete();
	});

Flux.zip

zip的作用是将多个publisher的信号合并(同步)为一个信号,多个publisher发布的信号需要一致,当发布次数不一致时,以少次数的为准——多个publiser在不同时间发布时zip都会对齐,其获取对齐数据的方式类似mergeOrdered的先进先出队列。若某个publiser队列为空且已结束信号则其他publiser的信号全部cancel。如下列代码仅输出四行数据。

Publisher<String> zip1 = Flux.<String>merge(p -> {
		new Thread(()->{
			sleep2();
			p.onNext("A");
			p.onComplete();
		}).start();
		p.onNext("B");
	}, p -> {
		p.onNext("C");
		p.onNext("D");
		p.onNext("E");
		p.onComplete();
	});
Publisher<Integer> zip2 = Flux.just(1,2,3,4,5,6);
Publisher<String> zip3 = Flux.just("+","-","*","/");
Flux<String> flux = Flux.<String, Integer, String>zip(zip1, zip2, zip3).map(t -> {
	return t.getT1() + "-" + t.getT2() + "-" + t.getT3();
});
flux.map(x -> {
	System.out.println(x);
	return x;
});
/*
B-1-+
C-2--
D-3-*
E-4-/
*/

每个publiser都有其发布信号的特性,若将上面的merge换成concat,输出会变成B-1-+A-2--C-3-*D-4-/——因为concat要等第一个publiser执行完毕才会往下执行剩余的发布者。zip有很多个重载方法可以合并2~n个publiser

文中代码运行

Reactor的Mono、Flux在WebFlux上下文环境中运行时与使用main方法或Junit线程运行有一些区别。在WebFlux中会根据不同的上下文来决定最后的数据获取方式。文中的代码均在WebFlux环境和main方法下运行过,不同之处会加以说明。main方法使用toStream阻塞获取数据。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部