RXJS

原创
2018/10/31 09:22
阅读数 59

RxJSReactiveX编程理念的JavaScript版本

推荐前端在线编程工具jsbin:https://jsbin.com/poraxawohe/edit?html,console

核心概念:

  • Observable
  • Observer
  • Operator

 

Observer是什么

先看代码:

let foo = Rx.Observable.create(observer => {
  console.log('Hello');
  observer.next(42);
});

let observer = x => console.log(x);
foo.subscribe(observer);

代码中的第二个变量就是observer. 没错, observer就是当Observable"返回"值的时候接受那个值的函数!第一行中的observer其实就是通过foo.subscribe传入的callback. 只不过稍加封装了。 怎么封装的? 看代码:

let foo = Rx.Observable.create(observer => {
  try {
    console.log('Hello');
    observer.next(42);
    observer.complete();
    observer.next(10);
  } catch(e) { observer.error(e) }
  
});

let observer = {
  next(value) { console.log(value) },
  complete() { console.log('completed'),
  error(err) { console.error(err) }
}
foo.subscribe(observer);

你看到observer被定义成了一个对象, 其实这才是完整的observer. 传入一个callback到observable.subcribe相当于传入了{ next: callback }

 

参考资料:https://zhuanlan.zhihu.com/p/25383159 ---来自民工叔叔

 

 

Rx操作符

1、fromEvent:转化成一个流

2、combineLatest:合并最新的流

3、pluck:到某个对像中的某个属性---Rx.Observable.fromEvent(length,"keyup").pluck('target','value');

4、zip:成对修改才会发送新的流

5、form:----Rx.Observable.from([1,2,3,4]);这个序列非常快,最新的值是4.

6、map:----Rx.Observable.fromEvent(width,"keyup").map(ev => ev.target.value);

7、mapTo:只需要知道发生了就好--Rx.Observable.fromEvent(width,"keyup").mapTo(2);

8、of:----Rx.Observable.of({id:1,value:30});

<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>JS Bin</title>
</head>
<body>
  <div> <input type="number" id="length" /></div>
  <div> <input type="number" id="width" /></div>
  <div id="area"></div>
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
</body>
</html>

const area = document.getElementById("area");
const length = document.getElementById("length");
//const length$ =  Rx.Observable.fromEvent(length,"keyup").pluck('target','value');
//const length$ =  Rx.Observable.from([1,2,3,4]);
const length$ = Rx.Observable.of({id:1,value:30});

const width = document.getElementById("width");
//const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');
//const width$ =  Rx.Observable.fromEvent(width,"keyup").map(ev => ev.target.value);
//const width$ =  Rx.Observable.fromEvent(width,"keyup").map(_ => 2);
const width$ =  Rx.Observable.fromEvent(width,"keyup").mapTo(2);


const area$ = Rx.Observable.combineLatest(length$,width$,(l,w) => {return l.value*w;});
//const area$ = Rx.Observable.zip(length$,width$,(l,w) => {return l*w;});
area$.subscribe(val => area.innerHTML = val);

Observable的性质

三种状态: next, error, complete

特殊的:永不结束,Never(多用于测试),Empty(结束但不发射,多用于测试),Throw(进入error 状态)

9、do:工具操作符---作为与外部交互的桥梁,临时subscribe的作用。

let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .do(v => {
        console.log(logLabel + v);
        logLabel='当前 ';
      })
      .take(3);

10、scan ---磊加

y对应的是val的值;
x对应的是每次累加后的值

const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .scan((x,y) =>{
        return x+y
      })
      .take(4);

0----1----2----3----4----5----6----
0---------2---------4---------6----
0---------2---------6---------12----

11、reduce : 用最后的累加值只会有一个输出

let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .take(4)   
      .reduce((x,y) =>{
        return x+y
      });

interval$.subscribe(
  val => console.log(val),
  err => console.log(err),
  () => console.log("I am complete")
);
//输出12.因为2+$+6+0

let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .take(4)   
      .reduce((x,y) =>{
        return [...x,y]
      },[]);

//output [0,2,4,6]

12、filter:   .filter(val => val % 2 ===0)

let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .do(v => {
        console.log(logLabel + v);
        logLabel='当前 ';
      })
      .take(3);

13、take:  取前几个流

let logLabel = '当前值是';
const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .do(v => {
        console.log(logLabel + v);
        logLabel='当前 ';
      })
      .take(3);

14、first/last

const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .first()

//无穷序列
const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .last();

15、skip:

const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .skip(2)
      .take(3);

16、Interval

const interval$ = Rx.Observable.interval(100)
      .filter(val => val % 2 ===0)
      .scan((x,y) =>{
        return x+y
      })
      .take(4);

17、Timer

const timer$ = Rx.Observable.timer(100,1000);
timer$.subscribe(
  val => console.log(val),
  err => console.log(err),
  () => console.log("I am complete")
);

18、debounce:接一个流

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')
                  .debounce(() => Rx.Observable.interval(300));

length$.subscribe(val => console.log(val));

19、debounceTime: 延迟多久发

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')
                  .debounceTime(300);

length$.subscribe(val => console.log(val));

20、distinct:  让整个序列中没有重复元素

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')
                  .distinct()

length$.subscribe(val => console.log(val));

21、distinctUntilChanged: 不能和前一个序列重复的值

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')
                  .distinctUntilChanged()

length$.subscribe(val => console.log(val));

22、merge:时间顺序,顺序不变

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')

const width = document.getElementById("width");
const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');

const merged$ = Rx.Observable.merge(length$,width$);

merged$.subscribe(val => console.log(val));

23、concat: 对接,上一个流执行完,才会执行下一个流

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')

const width = document.getElementById("width");
const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');

const merged$ = Rx.Observable.concat(length$,width$);

merged$.subscribe(val => console.log(val));

24、startWith:用于赋初始值

const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')

const width = document.getElementById("width");
const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');

const merged$ = Rx.Observable.concat(first$,width$);

merged$.subscribe(val => console.log(val));

25、combineLatest:任何一个流改变的时候都会发送

const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')

const width = document.getElementById("width");
const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');

const merged$ = Rx.Observable.combineLatest(length$,width$,(l,w)=>l*w);

merged$.subscribe(val => console.log(val));

26、withLatestFrom:以一个流为主,只有第一个流改变的时候才会发射流

const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')

const width = document.getElementById("width");
const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');

const merged$ = length$.withLatestFrom(width$);

merged$.subscribe(val => console.log(val));

27、zip:一一对应,第一个必须和第一个对应

const first$ = Rx.Observable.from([1,2,3,4]).startWith(0);

const length = document.getElementById("length");
const length$ =  Rx.Observable.fromEvent(length,
                  "keyup").pluck('target','value')

const width = document.getElementById("width");
const width$ =  Rx.Observable.fromEvent(width,"keyup").pluck('target','value');

const merged$ = Rx.Observable.zip(length$,width$,(l,w)=>l*w);

merged$.subscribe(val => console.log(val));

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部