其实在几年前因为 Angular 的原因接触过响应式编程,而这些年的一些项目经验,让我在再次回顾响应式编程的时候又有了新的理解。

# 什么是响应式编程

响应式编程基于观察者模式,是一种面向数据流和变化传播的声明式编程方式。

# 异步数据流

响应式编程常常用在异步数据流,通过订阅某个数据流,可以对数据进行一系列流式处理,例如过滤、计算、转换、合流等,配合函数式编程可以实现很多优秀的场景。

除了天然异步的前端、客户端等 GUI 开发以外,响应式编程在大数据处理中也同样拥有高并发、分布式、依赖解耦等优势,在这种同步阻塞转异步的并发场景下会有较大的性能提升,淘宝业务架构就是使用响应式的架构。

# 响应式编程在前端领域

在前端领域,常见的异步编程场景包括事件处理、用户输入、HTTP 响应等。对于这类型的数据流,可以使用响应式编程的方式来进行设计。

不少开发者基于响应式编程设计了一些工具库,包括 Rxjs、Mobx、Cycle.js 等。其中,Rxjs 提供了基于可观察对象(Observable)的 functional reactive programming 服务,Mobx 提供了基于状态管理的 transparent functional reactive programming 服务,而 Cycle.js 则是一个响应式前端框架。

我们可以结合具体场景来介绍下使用,这里会以 Rxjs 来说明。

# HTTP 请求与重试

基于响应式编程,我们可以很简单地实现一个请求的获取和自动重试:

import { ajax } from "rxjs/ajax";
import { map, retry, catchError } from "rxjs/operators";

const apiData = ajax("/api/data").pipe(
  // 可以在 catchError 之前使用 retry 操作符。它会订阅到原始的来源可观察对象,此处为重新发起 HTTP 请求
  retry(3), // 失败前会重试最多 3 次
  map((res) => {
    if (!res.response) {
      throw new Error("Value expected!");
    }
    return res.response;
  }),
  catchError((err) => of([]))
);

apiData.subscribe({
  next(x) {
    console.log("data: ", x);
  },
  error(err) {
    console.log("errors already caught... will not run");
  },
});

# 用户输入

对应用户的一些交互,也可以通过订阅的方式来获取需要的信息:

const observable = Rx.Observable.fromEvent(input, "input") // 监听 input 元素的 input 事件
  .map((e) => e.target.value) // 一旦发生,把事件对象 e 映射成 input 元素的值
  .filter((value) => value.length >= 1) // 接着过滤掉值长度小于 1 的
  .distinctUntilChanged() // 如果该值和过去最新的值相等,则忽略
  .subscribe(
    // subscribe 拿到数据
    (x) => console.log(x),
    (err) => console.error(err)
  );
// 订阅
observable.subscribe((x) => console.log(x));

在用户频繁交互的场景,数据的流式处理可以让我们很方便地进行节流和防抖。除此之外,模块间的调用和事件通信同样可以通过这种方式来进行处理。

# 比较其他技术

接触响应式编程这个概念的时候,大多数人都会对它产生困惑,也比较容易与 Promise、事件订阅这些设计混淆。我们来一起看看。

# Promise

Promise 相信大家也都很熟悉,在这里拿出来比较,其实更多是将 Rxjs 中的 Observable 与之比较。这两个其实很不一样:

  • Promise 会发生状态扭转,状态扭转不可逆;而 Observable 是无状态的,数据流可以源源不断,可用于随着时间的推移获取多个值
  • Promise 在定义时就会被执行;而 Observable 只有在被订阅时才会执行
  • Promise 不支持取消;而 Observable 可通过取消订阅取消正在进行的工作

# 事件

同样是基于观察者模式,相信很多人都对事件和响应式编程之间的关系比较迷惑。而根据具体的设计实现,事件和响应式编程模式可以达到高度相似。

一个比较显著的区别在于,由于响应式编程是面向数据流和变化传播的模式,意味着我们可以对数据流进行配置处理,使其在把事件传给事件处理器之前先进行转换。同样由于流式处理,响应式编程可以把包含一堆异步/事件的组合从开头到结尾用流的操作符清晰表示,而原始事件回调只能表示一堆相邻节点的关系,对于数据流动方向和过程都可以进一步掌握。

同时,结合响应式编程的合流、缓存等能力,我们可以收获更多。

# 响应式编程提供了怎样的服务

前面说了很多,相信大家对响应式编程的概念和使用有一定的理解了。现在,我们一起来看看它还能给我们带来怎样的服务。

# 热观察与冷观察

在 Rxjs 中,有热观察和冷观察的概念。其中的区别:

  • Hot Observable,可以理解为现场直播,我们进场的时候只能看到即时的内容
  • Cold Observable,可以理解为点播(电影),我们打开的时候会从头播放
let liveStreaming$ = Rx.Observable.interval(1000).take(5);

liveStreaming$.subscribe(
	data => console.log('subscriber from first second')
	err => console.log(err),
	() => console.log('completed')
)

setTimeout(() => {
	liveStreaming$.subscribe(
		data => console.log('subscriber from 2nd second')
		err => console.log(err),
		() => console.log('completed')
	)
}, 2000)
// 事实上两个订阅者接收到的值都是 0,1,2,3,4,此处为冷观察

Rxjs 中 Observable 默认为冷观察,而通过publish()connect()可以将冷的 Observable 转变成热的:

let publisher$ = Rx.Observable.interval(1000).take(5).publish();

publisher$.subscribe(
	data => console.log('subscriber from first minute'