介绍
RxJS是一个异步编程的库,同时它通过observable序列来实现基于事件的编程。它提供了一个核心的类型:Observable,几个辅助类型(Observer,Schedulers,Subjects),受到Array的扩展操作(map,filter,reduce,every等等)启发,允许直接处理异步事件的集合。
ReactiveX结合了Observer模式、Iterator模式和函数式编程和集合来构建一个管理事件序列的理想方式。
在RxJS中管理异步事件的基本概念如下:
- Observable:代表了一个调用未来值或事件的集合的概念
- Observer:代表了一个知道如何监听Observable传递过来的值的回调集合
- Subscription:代表了一个可执行的Observable,主要是用于取消执行
- Operators:是一个纯函数,允许处理集合与函数式编程风格的操作,比如map、filter、concat、flatMap等
- Subject:相当于一个EventEmitter,它的唯一的方法是广播一个值或事件给多个Observer
- Schedulers:是一个集中式调度程序来控制并发性,允许我们在setTimeout或者requestAnimationFrame上进行协调计算
第一个例子
正常注册一个事件监听函数:
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
使用RxJS,你可以创建一个observable来代替:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscrible(() => console.log('Clicked!'));
纯粹
使得RxJS变得如此强大的原因是它使用了纯函数,这意味着你的代码很少会发生错误。
正常你不会创建一个纯函数,代码的其他部分可能扰乱你的状态。
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked $(++count) times`));
RxJS将隔离你的状态
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} items`));
scan操作符类似于arrays的reduce操作符。它需要一个回调函数作为一个参数,函数返回的值将作为下次调用时的参数。
Flow
RxJS有一系列的操作符来帮你监控事件将如何流动。
这是一个每秒最多点击一次的程序:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
if (Date.now() - lastClick >= rate){
console.log(`Clicked ${++count} times`);
lastClick = Date.now();
}
});
使用RxJS:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
另外的控制符还有:filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged等。
值
你可以使用你的observables来转换值。
这是一个每次点击添加x坐标的程序:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate){
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
})
使用Rxjs:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));
另外的producing操作符:pluck、pairwise、sample等
Observable
Observables是一个延迟Push(关于Push的概念见后面)操作数据的集合。它们遵从下表:
Single | Multiple | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
举个例子。下面是一个Observable,当执行subscribed,它将会立即push 1、 2、 3(同步),然后过去一秒后push 4
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
为了调用Observable,然后看这些值,我们需要对这些数据进行订阅(subscribe)
var observable = Rx.Observable.create(function (observer){
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
})
});
console.log('just before subscribe');
observerble.subscribe({
next: x => console.log(`got value` + x),
error: err => console.error('somthing wrong occurred: ' +err),
complete: () => console.log('done')
});
console.log('just after subscribe');
执行结果如下:
just before subscribe
got value 1
got value 2
got value 3
just after sbuscribe
got value 4
done
Pull和Push
Pull和Push是关于数据提供者和数据消费者交互的两个不同的协议。
什么是Pull?在Pull系统中,当Consumer收到Producer的数据时,它会自己判断是否接收该数据,Producer自己并不知道数据将交给哪个Consumer。
所有的JavaScript函数都是一个Pull系统。函数是一个数据提供者,调用函数的代码是一个consuming(消费者),它将函数返回的值"pulling"出来。
ES2015介绍了generator functions and iterators (function*),它们是另外一种Pull系统。iterator.next() 是Consumer,它从iterator(Producer)中"pulling"出多个值
Producer | Consumer | |
---|---|---|
Pull | 被动:当需要时产生数据 | 主动:决定是否接收数据 |
Push | 主动:自己决定将数据传给谁 | 被动:响应式接收数据 |
什么是Push?在Push系统中,Producer决定将数据发往哪个Consumer。Consumer并不知道它自己的值来自哪个Producer
Promise是最常见的一个Push系统。一个Promise(Producer)分发一个结果值给注册的接口(Consumer),与函数不同的是,Promise当遇到值被"push"给callback时,他会保证它传递的对象是正确的。
RxJS介绍了Observables,它是一个新的Push系统。Observable是一个提供多值的Producer,将它们"pushing"给Observers(Consumer)
- Function:计算并同步调用一个值
- generator:计算并同步调用多个值
- Promise:计算后可能(不可能)返回一个值
- Observable:计算然后同步或异步返回一个或多个值
Observable as generalizations of functions
与主流相反,Observable不像EventEmitters,也不像Promise。在某些情况下,Observable的行为可能像EventEmitters,比如当使用RxJS的Subjects进行多途径传播时,但是大部分的情况它们都是不一样的。
考虑下面的情况:
function foo(){
console.log('Hello');
return 42;
}
var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y)
我们期望出现下面的结果:
"Hello"
42
"Hello"
42
当使用Observables时:
var foo = Rx.Observable.create(function (observer){
console.log('Hello');
observer.next(42);
});
foo.subscribe(function (x){
console.log(x);
});
foo.subscribe(function (y){
console.log(y);
})
它们有着同样地输出:
"Hello"
42
"Hello"
42
之所以出现这种情况是因为function和Observables都是延迟(lazy)计算的。如果你不调用function,console.log('Hello')这段代码是不会执行的。Observables是同样的,如果你不执行(subscribe)它,代码也不会执行?!癱alling”和"subscribing"都是一个独立的操作:两个function分别导致两个结果,两个Observale subscribes trigger也会分别导致两个结果。这与EventEmitters截然相反,EventEmitters会共享结果,并且它执行的时候也不会顾忌到底是否有subscribers存在,Observables不会是共享结果,并且也是延迟执行。
Subscribing一个Observable就像调用一个函数一样
一些人要求Observables是异步的,这是不正确的。看下面这个例子:
console.log('before');
console.log(foo.call());
console.log('after');
你将会看到这样的输出:
"before"
"Hello"
42
"after"
使用Observables
console.log('before');
foo.subscribe(function(x) {
console.log(x);
});
console.log('after');
输出是:
"before"
"Hello"
42
"after"
这证明了foo的订阅是一个完完全全的异步,就像一个函数一样。
Observables可以同步或异步地传递一个值
Observable和function的不同是什么呢?随之时间的流逝,Observables可以“返回”多个值,函数是不可以的。你不可以这么做:
function foo(){
console.log('Hello');
return 42;
return 100; // 不会执行到这儿
}
函数只能返回一次,Observables可以做到返回多次:
var foo = Rx.Observable.create(function (observer){
console.log('Hello');
observer.next(42);
observer.next(100); // "return another value"
observer.next(200); // "return" yet another
});
console.log('before');
foo.subscribe(function (x){
console.log(x);
});
console.log('after');
同步输出:
"before"
"Hello"
42
100
200
"after"
你也可以异步返回:
var foo = Rx.Observable.create(function (observer){
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // 异步
}, 1000);
});
console.log('before');
foo.subscribe(function(x){
console.log(x);
});
console.log('after');
输出:
"before"
"Hello"
42
100
200
"after"
300
结论:
- func.call()表示“同步给我一个数据”
- observable.subscribe()表示“给我任何数量的值,同步或者异步”
解析一个Observable
Observables使用Rx.Observable.create或者一个构造器创建(create),使用Observer来监听(subscribed),执行(execute)是通过投递一个next/error/complete来通知其他的Observer,然后按照各自的意愿(disposed)来执行。在一个Observable实例中,这四个方面都是通过编码实现的,但是这些可能与其他的类型相关,比如Obsrever和Subscription。
Observable的核心点:
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Disposing Observables
创建一个Observables
Rx.Observable.create是Observable构造器的一个别名,他需要一个参数:一个subscribe函数
下面的例子创建一个Observable,它的作用是每秒钟输出字符串'hi':
var observable = Rx.Observable.create(function subscrite(observer){
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
Observables可以使用create创建,但是我们经常使用creation operators,诸如from,interval等。
在上面的例子中,subscribe函数是描述Observable最重要的一部分,让我们来看看subscribing是什么意思。
subscribing to Observables
Observable的observable可以被订阅(subscribed),就像这样:
observable.subscribe(x => console.log(x));
observable.scribe和Observable.create(function subscribe(observer) {...})中的subscribe有着相同的名字并不是巧合。在库中,它们是不同的,但是在实际的用途中你可以在逻辑上把他们想成相同的。
同样的Observable被多个Observers监听时,它们是不共享的。
Subscribing一个Observable像调用一个函数一样,当一个数据被传递时提供一个回调
这个addEventListener/removeEventListener这样的API完全不一样。observable.subscribe作为一个给定的观察者,在Observable中并没有像listener一样被注册。Observable甚至不需要维护一系列的Observers。
Executing observables
代码Observable.create(function subscribe(observer) {...})代表了一个"Observable execution",它将仅仅在每个Observer的subscribes的延迟计算中。随着时间的推移,将产生多个结果,同步或者异步。
Observable可以传递的有三种类型:
- "Next" notification:传递一个数值,诸如Number、String、Object等
- “Error” notification:传递一个js异常
- "Complete" notification:什么值都不传递
Next notifications是最重要的也是最常见的类型:它们表示一个实际数据被送到Observer。在Observable Execute执行期间Error和Complete最多会发生一次。
下面的语法是在Observable Grammar or Contract中最好的表达:
next*(error|complete)?
在一个Observable Execute中,0或多个Next notifications可能被传递。如果有error或者Complete被传递,剩下的next将不会被传递。
下面是Observable execute传递3个Next notifications的例子:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
下面的例子中,Next notification 4不会被传递:
var observable = Rx.Observable.create(function subscribe(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // 不会被执行
})
用tru/catch代码快包裹起来是个好主意:
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // delivers an error if it caught one
}
});
处理(Disposing)Observable Executions
Observable Executing的个数可能是无限个,Observer中应该处理有限个next,所以我们需要一个API来停止execution。因为execution在每个Observer中都是独立的,一旦Observer完成接收值,它必须有一个方法来停止executing。
当 observable.subscribe 被调用,Observer将被附加到一个新创建的Observable execution中,这次调用将返回一个对象,即Subscription:
var subscription = observable.subscribe(x => console.log(x));
Subscription代表了一个进行中的executing,它有一个最小的API允许你取消execution??梢栽谡饫镌亩粮嘤泄赜?Subscription type here 的东西。使用 subscription.unsubscribe() 你可以取消正在进行的execution:
var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();
当我们使用create()创建一个Observable时,我们必须定义execution怎么处理资源。你可以通过返回一个自定义的 unsubscribe 函数来实现该步骤。
var observable = Rx.Observable.create(function subscribe(observer){
var intervalID = setInterval(() => {
observer.next('hi')
});
return function unsubscribe(){
clearInterval(intervalID);
}
})
然后这样来调用:
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // dispose the resources
Observer
什么是Observer?一个Observer是Observable传递过来的数据的customer。Observers是一个简单的一些列的回调,next、error、和 complete用来传递数据。下面的例子展现了一个典型的Observer对象:
var observer = {
next: x => console.log('Observable got a next value: ' + x),
error: err => console.log('Observable got and error: ' + err),
complete: () => console.log('Observable got a complete notification')
};
为了使用Observalbe,提供了一个subscribe:
observable.subscribe(observer)
你也可以提供部分回调:
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
};
当你订阅(subscribing)一个Observable时,你也许仅仅只提供一个函数作为参数:
observable.subscribe(x => console.log('Observer got a next value: ' + x));
在observable.subscribe的内部,他将使用第一个回调创建一个Observer对象作为一个next handler。所有的callback类型都可能被提供:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Subscription
什么是Subscription?一个Subscription代表了一个一次性的资源,通常表示的是一个Observable execution。一个Subscription有一个重要的方法,unsubscribe,它不需要参数,仅仅是处理subscription的资源。在之前的RxJS版本中,Subscription被称作"Disposable"。
var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();
一个Subscription实质上是一个unsubscribe()函数,用来释放资源或者取消一个Observable executions。
Subscriptions也可以放在一起,这样会导致使用一个unsubscribe()将取消多个Observable executions。你可以这么做:
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);
当执行时,我们将看到如下输出:
second: 0
first: 0
second: 1
first: 1
second: 2
Subscriptions有一个remove(otherSubscription)方法,用来移除关联的Subscirption
Subject
什么是Subject?一个RxJS Subject是一个特殊类型的Observable,它允许值可以多路广播给多个Observers。普通的Observables是单路广播(每个subscribed Observer拥有自己独立的Observable execution),Subjects是多路广播。
一个Subject像一个Observable,但是可以多路广播给Observers。Subjects像Eventmitters:它们维持许多注册过的监听器。
每个subject是一个Observable。给定一个Subject,你可以通过提供一个Observer来订阅(subscribe)它,然后开始正常的接收值。从Observer的角度来看,他不能告知Observer的Observable execution到底是来自一个不同的单路传播的Observable,还是来自Subject。
在Subject的内部,subscribe并没有调用一个新的execute去传递数据。它只是简单的注册Observers列表中的一个Observer,类似于addListener的使用。
每个subject是一个Observer。他是拥有next(v),error(e)和complete()方法的对象。为了给Subject一个新值,只需要调用 next(theValue),他讲多路传播给注册过的Observer。
在下面的例子中,我们在Subject中注册了两个Observers,我们传递一些值给Subject:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
输出:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
因为Subject同时也是一个Observer,这意味着你应该提供一个Subject作为Observable的subscribe的参数,像这样:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
执行如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
在上面的处理中,我们本质上仅仅是通过Subject将一个单路广播的Observable execution变为多路广播的。这个演示展示了Subjects是怎样将单路广播变为多路广播的。
这里有几个特殊的Subject类型:BehaviorSubject、ReplaySubject和AsyncSubject。
Multicasted Observables
一个"multicasted Observable"的实现是通过Subject的多个订阅(subscribers)来实现的,然而一个"unicast Observable"仅仅只通知一个单一的Observer。
在后台,multicast是这样操作的:Observers订阅(subscribe)一个相关的Subject,Subject订阅一个Observable源。
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerAa: ' + v)
});
muticasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// This is, under the hood, `source.subscribe(subject)`:
muticasted.connect();
Reference counting
手动的调用connect()来处理Subscription是很麻烦的。通常,我们希望当第一个Observer到达时,能够自动connect,当最后一个Observer被移除时,自动取消shared execution。
看看下面这些订阅发生时的列表:
- 第一个Observer订阅multicasted Observable
- multicasted observable连接
- next value 0被传递给第一个Observer
- 第二个Observer订阅multicasted Observable
- next value 1被传递给第一个Observer
- next value 1被传递给第二个Observer
- 第一个Observer解除监听
- next value2被传递给第二个Observer
- 第二个Observer解除监听
- 与multicasted observable连接的Observable解除连接
看下面的代码:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: v => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscrption1.unscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
如果我们希望避免一遍遍地调用connect(),我们可以使用ConnectableObservable的refCount()方法(reference counting),它返回一个Observable来跟踪有多少个订阅者(subscribers)。当订阅者从0增加到1时,它将自动调用connect(),只有当订阅者从1变为0时,它才会disconnect。
看下面的例子:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscrption1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v);
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
执行结果:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
refCount()方法仅存在ConnectableObservable中,它返回一个Observable,而不是另外的ConnectableObservable。
BehaviorSubject
Subjects的一种变形是BehaviorSubject,它有一个"the current value" 的概念。它存储了consumer最后一次执行时的value,每当一个Observer订阅时,它都会立即从BehaviorSubject接收一个"current value"。
例子:
var subject = new Rx.BehaviorSubject(0); // 0 is the inital value
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: v = console.log('observerB: ' + v)
});
subject.next(3);
输出:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
功能和它的名字一样:
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
subject.next(5);
输出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
你还可以指定一个以毫秒为单位的窗口事时间,除了buffer size之外,决定记录的值可以重复(时间内)。
var subject = new Rx.ReplaySubject(100, 500);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
}, 1000)
下面的输出中,第二个Observer在最后500ms内得到的数值为3、 4、 5:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
AsyncSubject表示只有最后一个Observable execution的值会被发送给observers,仅仅发生在执行完成时
var subject = new Rx.AsyncSubject();
subject.subscrbe({
next: v => console.log('onbserverA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
输出:
observerA: 5
observerB: 5
AsyncSubject类似于一个last() operator,他等待complete通知来传递一个唯一的值。
Opeartors
RxJS最有用的一部分就是operators,即使Observable是最基础的。Operators最基本的要点是允许复杂的代码变得简单化。
什么是Operators?
Opeartors是Obsrevable的方法,就像map(),filter(),merge()等。当它被调用时,它们并不改变已经存在的Observable,而是返回一个基于第一个Observable上新的Observable。
一个Operator本质上是一个纯函数,它接收一个Observable,基于其上返回一个新的Observable。在下面的例子中,我们创建了一个自定义的operator方法:
function multiplayByTen(input){
var output = Rx.Observable.create(function subscribe(observer){
input.subscribe({
next: v => observer.next(10 * v),
error: err => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}
var input = Rx.Observable.from([1, 2, 3 ,4]);
var output = multiplayByTen(input);
output.subscribe(x => console.log(x));
输出为:
10
20
30
40
注意订阅(subscribe)的输出将导致输入的Observable可观测的变化。我们称这个为"operator subscription chain"。
Instance opeartors versus static operators
什么是instance operator?最常见的情况是当你引用一个opeartors时,我们假定实现了一个operator,它是Observable实例的一个方法。例如,如果multiplayByTen operator变成一个官方的operator,它看起来会是这样:
Rx.Observable.prototype.multiplyByTen = function multiplyByTen(){
var input = this;
return Rx.subscrible.create(function subscribe(observer){
input.subccribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
}
Instance operators是一个实例运算符,我们使用它来推断可观测的输入。
注意,input observable不再是一个函数参数:
var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();
observable.subscribe(x => console.log(x));
什么是static operator?除了instance operators之外,static operators是直接附加在Observable类上的方法。一个static operator使用内部的this进行操作,但是并不完全依赖它的参数。
static operators是附着在Observable类上的纯函数,通常用于创建Observable
最常见的static operators类型是Create Operators,他不是将一个Observable改变成另外一个Observable,它们简单的获得一个non-Observable参数,比如number,然后create一个新的Observable。
一个典型的例子是使用interval函数。它获取一个数值(不是一个Observable)作为输入参数,然后输出一个Observable:
var observable = Rx.Observable.interval(1000 /* number of milliseconds */)
创建一个creation operaor的另外一个例子是create,就是我们之前一直在使用的例子。 all static creation operators here
然而,static operators也许和普通的creation性质不同。一些Combination Operator也许是静态的,比如merge、combineLatest、concat等。将这些作为静态是有意义的,因为它们把multiple Observales作为输入,不只是一个,比如:
var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);
var merged = Rx.Observable.merge(observable1, observable2);
Marble diagrams
为了解释operators是如何工作的,光是文本解释是不够的。许多operators和时间有关,它们可能会延迟执行,例如,throttle等。图标往往能够比文字更多表达清楚。Marble Diagrams能够可视化的表现出operators是如何工作的,包括输入的Observable(s),operator和它的参数,以及输出的Observable
在一个marble diagram中,随着时间的流逝,它会描述值("marbles")在Observable execution上传递。
你可以在下面看到marble diagram的解析:
- 时间从左往右流动,代表input Observable的execution
- 这些代表Observable传递传来的值
- 这个竖线表示"complete" notification,它表明Observable已经成功完成了。
- 这个方框表示input Observable的operator(上图)产生出的output Observable(下图)。方框内的文字表示转变的属性。
- 这个Observable是调用operator产生的
- 这个X代表output Observable发出的错误,说明因为某些原因而异常终止。
在这个网站的站点,我们会广泛的使用marble diagrams去解释operators是如何工作的。它们也许在其他的地方也很有用,比如单元测试等。
选择一个operator
你需要为你的程序选择一个适当的operator吗?先从下面的列表选择一个:
- 我已经有了一个Observable
- 我想改变每个传递的值
- 我想选择每个传递值的属性
- 你应该使用pluck
- 我想查看每个被传递的值,但是不影响它们
- 你应该使用do
- 我想过滤某些值
- 基于一个自定义的逻辑
- 你应该使用filter
- 基于一个自定义的逻辑
更多内容参考官网:Choose an operator
operators的分类
Scheduler
什么是Scheduler?当一个subscription开始工作或者notifications被传递,scheduler就会开始调图。它包含三个组件。
- 一个Scheduler是一个数据结构(data structure)。它知道如何基于优先级或者其它标准进行存储,执行队列任务
- 一个Scheduler是一个执行上下文(execution context)。它表示task在哪个地方,什么时候执行()
- 一个Scheduler是一个(虚拟(virtual))时钟。它基于scheduler上的getter方法now(),提出了一个"时间(time)"的概念。任务被安排在一个特殊的调度器中,它会遵守给它的时间。
看下面例子中,我们使用之前已经写过的例子,同步传递数值1、2、 3,然后使用observerOn操作符来指定异步调度:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
.observerOn(Rx.Scheduler.async);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
});
console.log('just after subscribe');
输出:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
注意got value这个语句实在 just after subscribe只有打印输出的,这和我们看到的代码顺序不一致。这时因为 observerOn(Rx.Scheduler.async)在Observable.create和最后一个Observer之间引入了一个代理的Observer。让我们重新为一些标识符取名,以便让他们之间有着明显的差别:
var observable = Rx.Observable.create(function (proxyObserver) {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);
var finalObserver = {
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
proxyObserver在observeOn(Rx.Scheduler.async)中创建,它的next(val)方法大概像下面这样:
var proxyObserver = {
next: (val) => {
Rx.Scheduler.async.schedule(
(x) => finalObserver.next(x),
0 /* delay */,
val /* will be the x for the function above */
);
},
// ...
}
这有点儿像setTimeout或者setInterval是异步调度操作,即使给定的delay为0。按照惯例,在JS中,setTimeout(fn, 0)知道运行fn函数的时机最早是下一次循环队列初。这也说明了为什么 got value 1是最后运行的。
可以给Scheduler的schedule传递一个延时(delay)参数,它可以让Scheduler内部的时钟去延时到指定时间。Scheduler的时钟和真实的时钟没有任何关系。它更类似于延时,而不是运行指定的时间。
Scheduler类型
异步Scheduler只是RxJS提供的一种Scheduler。通过使用Scheduler的静态方法可以创建下面的类型
Scheduler | Purpose |
---|---|
null | 不使用Scheduler, notifications将会被同步和递归地交付给Observer。使用这个来进行常量操作或者尾部递归操作 |
Rx.Scheduler.queue | Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations. |
Rx.Scheduler.asap | Schedules on the micro task queue, which uses the fastest transport mechanism available, either Node.js' process.nextTick() or Web Worker MessageChannel or setTimeout or others. Use this for asynchronous conversions. |
Rx.Scheduler.async | Schedules work with setInterval. Use this for time-based operations. |