【RxJS】RxJS手册

2023/11/14 16:02:02

ReactiveX 是一种响应式编程的实现方案,RxJS 是这种方案在 JavaScript 语言中的实现。

使用 RxJS 可以更优雅地处理异步数据流。

示例

示例 1:Observable

Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

import { Observable } from "rxjs";

// observer$尾部的$是表示当前这个变量是个 observer
const observer$ = new Observable((subscriber) => {
  setTimeout(() => {
    console.log("inner next 1");
    subscriber.next(1);
  }, 500);
  setTimeout(() => {
    console.log("inner next complate");
    subscriber.complete();
  }, 1000);
});

setTimeout(() => {
  // 启动流
  console.log("s1 subscribe");
  const subscription1 = observer$.subscribe({
    complete: () => console.log("s1", "done"),
    next: (value) => console.log("s1", value),
    error: () => console.log("s1", "error"),
  });
  const subscription2 = observer$.subscribe(console.log);
  if (Math.random() > 0.5) {
    subscription1.unsubscribe();
    console.log("s1 unsubscribe");
  }
}, 1000);

调用 observer$.subscribe() 方法时, Observable(fn) 中的 fn 才真正被执行。

observer$.subscribe(console.log) 时相当于只监听了 next() 事件。

示例 2:Subject

Subject 是一种特殊类型的 Observable,它可以多播给多个观察者。

Subject 还像是 EventEmitters,维护着多个监听器的注册表。

可以把 Subject 看成是一个事件总线。

import { Subject } from "rxjs";

const subject = new Subject(); // 0是初始值

subject.next(0);
subject.next(1);

const subjection1$ = subject.subscribe((v) => console.log("A: " + v));

subject.next(2);
subject.next(3);

const subjection2$ = subject.subscribe((v) => console.log("B: " + v));

subject.next(4);

subjection1$.unsubscribe();
subjection2$.unsubscribe();

// output
// A: 2
// A: 3
// A: 4
// B: 4

无法监听到调用 observer$.subscribe() 订阅之前发送的消息。

示例 3:ReplaySubject 可监听历史消息

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject(); // 0是初始值

subject.next(0);
subject.next(1);

subject.subscribe((v) => console.log("A: " + v));

subject.next(2);
subject.next(3);

subject.subscribe((v) => console.log("B: " + v));

subject.next(4);

// output
// A: 0
// A: 1
// A: 2
// A: 3
// B: 0
// B: 1
// B: 2
// B: 3
// A: 4
// B: 4

示例 4:BehaviorSubject 可监听最新一次历史消息

import { BehaviorSubject } from "rxjs";

const subject = new BehaviorSubject(); // 0是初始值

subject.next(0);
subject.next(1);

subject.subscribe((v) => console.log("A: " + v));

subject.next(2);
subject.next(3);

subject.subscribe((v) => console.log("B: " + v));

subject.next(4);

// output
// A: 1
// A: 2
// A: 3
// B: 3
// A: 4
// B: 4

内置操作符

操作符分类open in new window

参考

RxJS 中文文档open in new window