起源

RxJS 官网上有一篇非常好的文章,如果你想理解 RxJS, 这篇文章需要仔细反复的看几遍。其中有个重要的观点就是 Observables as generalizations of functions。我觉得这是非常深刻的一点。它表达的是:

可以把 Observable 看成是 function 的更加通用的版本。 函数的特点是调用一次就返回一个值,而 Observable 也是可以调用(subscribe) 的函数,但是它比函数更加通用的地方在于,随着时间的变化,能多次返回多个值。

当然,这也能通过 callback 实现。

把这篇文章中的一个简单的例子改成用 callback 实现:

import { Observable } from 'rxjs';

const foo = Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200); // "return" yet another
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

callback 版本就是:

const foo = function (fn) {
  console.log('Hello');
  fn(42);
  fn(100); // "return" another value
  fn(200); // "return" yet another
}

console.log('before');
foo(console.log);
console.log('after');

这两个版本的执行结果是完全一样的。这是个很简单的场景,当然即使非常复杂的异步场景,也能替换成 callback 的。

有了 callback 还是会有 Promise, generator, async/await 等等,因为要更优雅的解决问题。那用 RxJS 能带来什么好处?弄清楚这个问题是很重要的,因为这决定了你是否要使用 RxJS(以及何时要用)

用 RxJS 能带来什么好处

官网的介绍 说:

RxJS is a library for composing asynchronous and event-based programs by using observable sequences.

Think of RxJS as Lodash for events.

ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.

嗯,用来处理 asynchronous 和 event 好像有优势。

其实要判断是不是有优势很简单,实现同样的功能,看谁的代码更少,可读性更高,可重用性更好,可维护性更好…所以接下来我会举很多例子,这些例子大部分来自 RxJS 和 Angular 的官方文档,也有一些是我从网络上找来的例子,我会尝试用 Ramda 实现同样的功能,然后比较代码。(为什么用 Ramda 文章末尾会说)

  1. Here’s how you can add the current mouse x position for every click
const { fromEvent } = rxjs;
const { throttleTime, map, scan } = rxjs.operators;

const button = document.querySelector('button');
fromEvent(button, 'click').pipe(
  throttleTime(1000),
  map(event => event.clientX),
  scan((count, clientX) => count + clientX, 0)
)
.subscribe(count => console.log(count));
let count = 0;
const throttle = require('lodash.throttle');
const button = document.querySelector('button');
button.addEventListener('click', throttle((event) => {
    count += event.clientX;
    console.log(count)
}, 1000);

后面这个版本明显代码更简洁易读,但是有个缺点,引入了 count 这个变量,就不太纯了。但是我觉得 RxJS 的版本用 scan(类似于 JS 里的 reduce) 更加难读了。

  1. Type-ahead suggestions
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

const searchBox = document.getElementById('search-box');

const typeahead = fromEvent(searchBox, 'input').pipe(
  map((e: KeyboardEvent) => e.target.value),
  filter(text => text.length > 2),
  debounceTime(1000),
  distinctUntilChanged(),
  switchMap(() => ajax('/api/endpoint'))
);

typeahead.subscribe(data => {
 // Handle the data from the API
});
import axios from 'axios';
const debounce = require('lodash.debounce');

const searchBox = document.getElementById('search-box');
let lastInputValue = ''

searchBox.addEventListener('input',  debounce((e) => {
    const inputValue = e.target.value
    if((inputValue.length <= 2) || (inputValue === lastInputValue)) {
      return
    } else {
      lastInputValue = inputValue
      axios('/api/endpoint', {inputValue})
      .then(data => {
          // Handle the data from the API
      })
    }

}, 1000);

这里对比来看,RxJS 的版本更加好,更加声明式。不用 RxJS 的版本比较命令式,但是也还行,基本一眼能看出代码想要做什么。

  1. Exponential backoff
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => zip(range(1, maxTries), attempts)
     .pipe(
       map(([i]) => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}