RxJS-Scheduler 调度器基本概念

RxJS-Scheduler 调度器基本概念

RxJS 用久了之后就会发现Observable有一个优势是可以同时处理同步和非同步行为,但这个优势也带来了一个问题,就是我们常常会搞不清处现在的observable执行方式是同步的还是非同步的。换句话说,我们很容易搞不清楚**observable到底什么时候开始发送元素!**

1. 什么是Scheduler?

Scheduler控制何时开始订阅以及何时传递通知。它由三个部分组成。

Scheduler是一种数据结构。它知道如何根据优先级或其他条件存储和排列任务。

Scheduler是一个执行上下文。它表示任务何时何地被执行(例如,立即执行或在其他回调中执行,例如setTimeoutprocess.nextTick

Scheduler具体虚拟时钟。它通过getter方法now()提供“时间”的概念。让任务在特定的时间点被执行。

Scheduler可以定义可观察对象将在哪些执行上下文中向其观察者传递通知。

简而言之Scheduler会影响Observable开始执行及元素送达的时机。

示例如下

const observable = new Observable((observer) => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
}).pipe(
    observeOn(asyncScheduler)   //  异步执行
);

console.log('订阅之前');
observable.subscribe({
    next: value => console.log('next: ' + value),
    error: error => console.log('error: ' + error),
    complete: () => console.log('complete!')
});
console.log('订阅之后');
//  订阅之前
//  订阅之后
//  next: 1
//  next: 2
//  next: 3
//  complete!

上面这段代码原本是同步执行的,但我们用了observeOn(asyncScheduler)原本是同步执行的就变成了异步执行。

2. Scheduler类型

调度器作用null通过不传递任何调度程序,可以同步和递归地传递通知。用于定时操作或尾递归操作。queueScheduler在当前事件中的队列上进行调度。使用它进行迭代操作。asapScheduler微任务队列上的计划,在setTimeout中执行asyncScheduler使用setInterval来实现,通常是跟时间相关的operato才会用到。animationFrameScheduler将在下一次浏览器内容重新绘制之前发生的任务。可用于创建流畅的浏览器动画。利用Window.requestAnimationFrame这个API去实现的

3. 使用Scheduler

其实我们在使用各种不同的operator时,这些operator就会各自预设不同的scheduler
默认情况下,RxJS将使用最小并发原则选择默认调度器。例如一个无限的observable就会预设为queueScheduler,而timer相关的operator则预设为asyncScheduler

静态创建操作符通常将Scheduler作为参数。使用Scheduler除了前面用到的observeOn()方法外,以下这几个creation operators最后一个参数都能接收Scheduler

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

例如from

var observable = from([1, 2, 3, 4, 5], asyncScheduler);	//	异步执行

console.log('订阅之前');
observable.subscribe({
    next: value => console.log('next: ' + value),
    error: error => console.log('error: ' + error),
    complete: () => console.log('complete!')
});
console.log('订阅之后');
//  订阅之前
//  订阅之后
//  next: 1
//  next: 2
//  ……
//  complete!

不过,最通用的方式还是observeOn()只要是observable就可以用这个方法。

4. 使用情形

4.1 queueScheduler

queueScheduler的运作方式跟默认的立即执行很像,但是当我们使用到递归的方法时,他会排列这些行为而非直接执行,一个递归的operator就是它会执行另一个operator,最好的例子就是repeat(),如果我们不给它参数的话,它会执行无限次,像下面这个例子

const observable = of(10).pipe(
    repeat(),
    take(1)
);

observable.subscribe({
    next: value => console.log('next: ' + value),
    error: error => console.log('error: ' + error),
    complete: () => console.log('complete!')
});