RxJS
是ReactiveX
编程理念的JavaScript
版本
推荐前端在线编程工具jsbin:https://jsbin.com/poraxawohe/edit?html,console
核心概念:
Observable
Observer
Operator
Observer是什么
先看代码:
let foo = Rx.Observable.create(observer => {
console.log('Hello');
observer.next(42);
});
let observer = x => console.log(x);
foo.subscribe(observer);
代码中的第二个变量就是observer. 没错, observer就是当Observable"返回"值的时候接受那个值的函数!第一行中的observer其实就是通过foo.subscribe传入的callback. 只不过稍加封装了。 怎么封装的? 看代码:
let foo = Rx.Observable.create(observer => {
try {
console.log('Hello');
observer.next(42);
observer.complete();
observer.next(10);
} catch(e) { observer.error(e) }
});
let observer = {
next(value) { console.log(value) },
complete() { console.log('completed'),
error(err) { console.error(err) }
}
foo.subscribe(observer);
你看到observer被定义成了一个对象, 其实这才是完整的observer. 传入一个callback到observable.subcribe相当于传入了{ next: callback }
。
参考资料:https://zhuanlan.zhihu.com/p/25383159 ---来自民工叔叔
Rx操作符
1、fromEvent:转化成一个流
2、combineLatest:合并最新的流
3、pluck:到某个对像中的某个属性---Rx.Observable.fromEvent(length,"keyup").pluck('target','value');
4、zip:成对修改才会发送新的流
5、form:----Rx.Observable.from([1,2,3,4]);这个序列非常快,最新的值是4.
6、map:----Rx.Observable.fromEvent(width,"keyup").map(ev => ev.target.value);
7、mapTo:只需要知道发生了就好--Rx.Observable.fromEvent(width,"keyup").mapTo(2);
8、of:----Rx.Observable.of({id:1,value:30});
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>JS Bin</title>
</head>
<body>
<div> <input type="number" id="length" /></div>
<div> <input type="number" id="width" /></div>
<div id="area"></div>
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
</body>
</html>
const area = document.getElementById("area");
const length = document.getElementById("length");
//const length$ = Rx.Observable.fromEvent(length,"keyup").pluck('target','value');
//const length$ = Rx.Observable.from([1,2,3,4]);
const length$ = Rx.Observable.of({id:1,value:30});
const width = document.getElementById("width");
//const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
//const width$ = Rx.Observable.fromEvent(width,"keyup").map(ev => ev.target.value);
//const width$ = Rx.Observable.fromEvent(width,"keyup").map(_ => 2);
const width$ = Rx.Observable.fromEvent(width,"keyup").mapTo(2);
const area$ = Rx.Observable.combineLatest(length$,width$,(l,w) => {return l.value*w;});
//const area$ = Rx.Observable.zip(length$,width$,(l,w) => {return l*w;});
area$.subscribe(val => area.innerHTML = val);
Observable的性质
三种状态: next, error, complete
特殊的:永不结束,Never(多用于测试),Empty(结束但不发射,多用于测试),Throw(进入error 状态)
9、do:工具操作符---作为与外部交互的桥梁,临时subscribe的作用。
let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.do(v => {
console.log(logLabel + v);
logLabel='当前 ';
})
.take(3);
10、scan ---磊加
y对应的是val的值;
x对应的是每次累加后的值
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.scan((x,y) =>{
return x+y
})
.take(4);
0----1----2----3----4----5----6----
0---------2---------4---------6----
0---------2---------6---------12----
11、reduce : 用最后的累加值只会有一个输出
let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.take(4)
.reduce((x,y) =>{
return x+y
});
interval$.subscribe(
val => console.log(val),
err => console.log(err),
() => console.log("I am complete")
);
//输出12.因为2+$+6+0
let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.take(4)
.reduce((x,y) =>{
return [...x,y]
},[]);
//output [0,2,4,6]
12、filter: .filter(val => val % 2 ===0)
let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.do(v => {
console.log(logLabel + v);
logLabel='当前 ';
})
.take(3);
13、take: 取前几个流
let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.do(v => {
console.log(logLabel + v);
logLabel='当前 ';
})
.take(3);
14、first/last
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.first()
//无穷序列
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.last();
15、skip:
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.skip(2)
.take(3);
16、Interval
const interval$ = Rx.Observable.interval(100)
.filter(val => val % 2 ===0)
.scan((x,y) =>{
return x+y
})
.take(4);
17、Timer
const timer$ = Rx.Observable.timer(100,1000);
timer$.subscribe(
val => console.log(val),
err => console.log(err),
() => console.log("I am complete")
);
18、debounce:接一个流
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
.debounce(() => Rx.Observable.interval(300));
length$.subscribe(val => console.log(val));
19、debounceTime: 延迟多久发
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
.debounceTime(300);
length$.subscribe(val => console.log(val));
20、distinct: 让整个序列中没有重复元素
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
.distinct()
length$.subscribe(val => console.log(val));
21、distinctUntilChanged: 不能和前一个序列重复的值
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
.distinctUntilChanged()
length$.subscribe(val => console.log(val));
22、merge:时间顺序,顺序不变
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
const width = document.getElementById("width");
const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
const merged$ = Rx.Observable.merge(length$,width$);
merged$.subscribe(val => console.log(val));
23、concat: 对接,上一个流执行完,才会执行下一个流
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
const width = document.getElementById("width");
const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
const merged$ = Rx.Observable.concat(length$,width$);
merged$.subscribe(val => console.log(val));
24、startWith:用于赋初始值
const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
const width = document.getElementById("width");
const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
const merged$ = Rx.Observable.concat(first$,width$);
merged$.subscribe(val => console.log(val));
25、combineLatest:任何一个流改变的时候都会发送
const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
const width = document.getElementById("width");
const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
const merged$ = Rx.Observable.combineLatest(length$,width$,(l,w)=>l*w);
merged$.subscribe(val => console.log(val));
26、withLatestFrom:以一个流为主,只有第一个流改变的时候才会发射流
const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
const width = document.getElementById("width");
const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
const merged$ = length$.withLatestFrom(width$);
merged$.subscribe(val => console.log(val));
27、zip:一一对应,第一个必须和第一个对应
const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);
const length = document.getElementById("length");
const length$ = Rx.Observable.fromEvent(length,
"keyup").pluck('target','value')
const width = document.getElementById("width");
const width$ = Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
const merged$ = Rx.Observable.zip(length$,width$,(l,w)=>l*w);
merged$.subscribe(val => console.log(val));