rxjs

v6.5.5 新增

稳定性: 稳定

RxJS 是一个通过使用可观察序列来编写异步和基于事件的程序的库。 它提供了一个核心类型,可观察对象,卫星类型(观察者、调度器、主体)和受 Array 方法启发的运算符(map、filter、reduce、every 等),以允许将异步事件作为集合处理。

使用前请先阅读中文文档理解概念,这里只介绍一些常用方法。

例子

1"ui";
2let { fromEvent } = require("rxjs");
3ui.layout(
4    <vertical padding="16">
5        <button id="y" text="普通按钮" w="auto" />
6        <vertical id="box"></vertical>
7    </vertical>
8);
9//从按钮的点击事件创建一个可观察对象
10let ob = fromEvent(ui.y, "click");
11let box = ui.box;
12//订阅这个对象
13ob.subscribe(() => {
14    ui.inflate(<text text="1234"></text>, box, true);
15});

很多时候我们不希望这个按钮触发的太快,使用纯 js 就需要添加额外的变量用于判断,使用 rxjs 只需要添加一个throttleTime操作符

1"ui";
2let { fromEvent, throttleTime } = require("rxjs");
3ui.layout(
4    <vertical padding="16">
5        <button id="y" text="普通按钮" w="auto" />
6        <vertical id="box"></vertical>
7    </vertical>
8);
9//从按钮的点击事件创建一个可观察对象
10let ob = fromEvent(ui.y, "click").pipe(throttleTime(1000));
11let box = ui.box;
12//订阅这个对象
13ob.subscribe(() => {
14    ui.inflate(<text text="1234"></text>, box, true);
15});

可以将大部分采用回调、事件、Promise 的 api 转换成 Observable

1//回调
2let { bindCallback } = require("rxjs");
3
4let httpGet = bindCallback(http.get);
5
6httpGet("https://m.baidu.com", {}).subscribe({
7    next: (res, err) => {
8        console.log("body:", res.body.string().length);
9    },
10    complete() {
11        console.log("done");
12    },
13});
14//事件
15fromEvent(events, "exit").subscribe({
16    next() {
17        console.log("event on exit");
18    },
19});
20//Promise
21from(Promise.delay(1000)).subscribe({
22    next: () => {
23        console.log("Promise res");
24    },
25    complete() {
26        console.log("done");
27    },
28});

创建操作符

  • of(...args) 将参数转换为可观察的序列。
1let { of } = require("rxjs");
2of(10, 20, 30).subscribe({
3    next: (value) => console.log("next:", value),
4    error: (err) => console.log("error:", err),
5    complete: () => console.log("the end"),
6});
7// Outputs
8// next: 10
9// next: 20
10// next: 30
11// the end
  • from(input,scheduler?) 从数组、类数组对象、Promise、可迭代对象或类 Observable 对象创建 Observable。
1let { from } = require("rxjs");
2const array = [10, 20, 30];
3const result = from(array);
4result.subscribe((x) => console.log(x));
5// Logs:
6// 10
7// 20
8// 30
  • fromEvent(target,eventName,options?) 创建一个 Observable,它发出来自给定事件目标的特定类型的事件。
  • intervallink(period=0,period=asyncScheduler) 创建一个 Observable,该 Observable 在指定的时间间隔内每隔指定的时间间隔发出序列号

调度器

rxjs 中内置了几种调度器,其中最常用的是 asyncScheduler,这是多数处理异步操作符使用的默认调度器,在 autox 环境中,只支持 asyncScheduler 调度器。

autox 特有的调度器

v6.5.6 新增 由于 autox 中存在比较复杂的多线程环境,处理 ui 和阻塞操作时经常需要切换线程,因此为此库添加了几个特殊的调度器简化这些操作 需要使用以下方式导入

1let { ioScheduler, uiScheduler, mainScheduler, workScheduler, newSingleScheduler } = require("rxjs/ext");
  • uiScheduler 在 ui 线程中运行
  • mainScheduler 在脚本主线程中运行,若是 ui 脚本则和uiScheduler一致
  • newSingleScheduler() 这是一个函数,创建一个独立的线程作为调度器,使用完毕后需要调用recycle回收资源
1let { from } = require("rxjs");
2let { newSingleScheduler } = require("rxjs/ext");
3let t = newSingleScheduler();
4from([1, 2, 3], t).subscribe({
5    next: (v) => {
6        console.log(v);
7        console.log(threads.currentThread());
8    },
9    complete() {
10        t.recycle();
11    },
12});
  • workScheduler 在一个默认的线程池中运行,用于处理密集计算操作。 注意: 此调度器是不安全的,由于并发问题,只能配合 fromEvent 这样永远不会'结束'的 Observable 来使用,下面这个示例就不会按预期执行
1let { from } = require("rxjs");
2let { workScheduler } = require("rxjs/ext");
3from([1, 2, 3], workScheduler).subscribe((v) => {
4    log(v); //可能看到0-3个输出,且是乱序的
5});

原因在于 Observable 执行 complete 或 error 后再调用 next 产生的值将被忽略,就算 next 调用在 complete 前面,通过此调度器可能会导致 next 真正执行时在 complete 后面,此外某些操作符在这个调度器下也会工作异常。

  • ioSchedulerworkScheduler类似,区别在于每次触发会生成一个新线程来运行,比较耗费资源,适用于 io 操作等长时间阻塞任务

例子

1"ui";
2let { fromEvent, scan, map, observeOn, throttleTime } = require("rxjs");
3let { ioScheduler, uiScheduler, mainScheduler, workScheduler, newSingleScheduler } = require("rxjs/ext");
4ui.layout(
5    <vertical padding="16">
6        <button id="y" text="普通按钮" w="auto" />
7        <vertical id="box"></vertical>
8    </vertical>
9);
10//从按钮的点击事件创建一个可观察对象
11let ob = fromEvent(ui.y, "click");
12let box = ui.box;
13//订阅这个对象
14ob.pipe(
15    scan((a) => a + 1, 0),
16    //转到线程池调度器
17    observeOn(workScheduler),
18    map((v) => {
19        //模拟一些阻塞耗时任务
20        sleep(1000);
21        return v;
22    }),
23    //回到ui调度器
24    observeOn(uiScheduler)
25).subscribe((v) => {
26    ui.inflate(<text text={"已计算: 第" + v + "次"}></text>, box, true);
27});