RxJava很优势的一个方面就是他的线程切换,基本是依靠ObserveOn和SubscribeOn这两个操作符来完成的。
先来看看什么是ObserveOn和SubscribeOn,官方对他们的定义是这样的:
- ObserveOn
specify the Scheduler on which an observer will observe this Observable
指定一个观察者在哪个调度器上观察这个Observable
- SubscribeOn
specify the Scheduler on which an Observable will operate
指定Observable自身在哪个调度器上执行
By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.
As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.
从上面的图例中可以看出,SubscribeOn这个操作符指定的是Observable自身在哪个调度器上执行,而且跟调用的位置没有关系。而ObservableOn则是指定一个观察者在哪个调度器上观察这个Observable,当每次调用了ObservableOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用ObservableOn切换了调度器。
那么,如果多次调用SubscribeOn,会有什么效果呢?写个例子测试一下就知道了。
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-1:"+Thread.currentThread().getName());
return integer;
}
})
.subscribeOn(Schedulers.newThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-2:"+Thread.currentThread().getName());
return integer;
}
})
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-3:"+Thread.currentThread().getName());
return integer;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "subscribe:"+Thread.currentThread().getName());
}
});
在例子用,我多次调用了subscribeOn操作符,并且在每个map操作符中打印了当前线程的名称。
从打印的日志中可以看出,只有第一次调用SubscribeOn时选择的调度器.subscribeOn(Schedulers.newThread())
有作用,而后来选择的都没有作用。这说明了SubscribeOn这个操作符,与调用的位置无关,而且只有第一次调用时会指定Observable自己在哪个调度器执行。
其实有一种情况特殊,就是在DoOnSubscribe操作符之后调用,可以使DoOnSubscribe在指定的调度器中执行。
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.i(TAG, "create:" + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
}
});
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map:" + Thread.currentThread().getName());
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.i(TAG, "doOnSubscribe:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "subscribe:" + Thread.currentThread().getName());
}
});
由此可见,SubscribeOn不仅可以指定Observable自身的调度器,也可以指定DoOnSubscribe执行的调度器。
我们知道了多次调用SubscribeOn并不会起作用,那么多次调用ObservableOn呢?还是同样的例子,将所有SubscribeOn换为ObservableOn试试。
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-1:"+Thread.currentThread().getName());
return integer;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-2:"+Thread.currentThread().getName());
return integer;
}
})
.observeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-3:"+Thread.currentThread().getName());
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "subscribe:"+Thread.currentThread().getName());
}
});
从打印的日志中可以看出,每次调用了ObservableOn操作符之后,之后的Map和Subscribe操作符都会发生在指定的调度器中,实现了线程的切换。
平时我们见到最多的使用场景,可能就是官方图例上描述的那样,配合Retrofit从网络拿回数据、在io线程或子线程执行某些耗时操作,比如一些变换操作,然后再切换到主线程去更新UI,可以用RxJava的线程切换很方便的实现。但是这只是很简单的应用场景之一,RxJava能做的远不止这些,我仍在不断的摸索中。