RxJava的线程控制主要设计到两种操作符:subscribeOn
和observeOn
subscribeOn:如果多次调用,则只有第一次调用有效;
observeOn:如果多次调用,每次有可以切换线程。
(1)默认情况下
Observable.just("A")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "threadName:"+Thread.currentThread().getName());
}
});
打印日志:
默认情况下被观察者和观察者是运行在主线程的,如果阻塞50秒(耗时操作)
Observable.just("A")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(50000);
Log.d("aaa", "threadName:"+Thread.currentThread().getName());
}
});
这样会阻塞主线程。
这时,我们就需要用到线程控制的知识了。
(2)Scheduler的种类
Schedulers.io(?):
用于IO密集型的操作,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。Schedulers.newThread(?):
在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io(?)的地方,都可以使用Schedulers.newThread(?),但是,Schedulers.newThread(?)的效率没有Schedulers.io(?)高。Schedulers.computation():
用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。Schedulers.trampoline():
在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。Schedulers.single():
拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。Scheduler.from(@NonNull Executor executor):
指定一个线程调度器,由此调度器来控制任务的执行策略。AndroidSchedulers.mainThread():
在Android UI线程中执行任务,为Android开发定制。注:
在RxJava2中,废弃了RxJava1中的Schedulers.immediate(?)
在RxJava1中,Schedulers.immediate(?)的作用为在当前线程立即执行任务,功能等同于RxJava2中的Schedulers.trampoline(?)。
而Schedulers.trampoline(?)在RxJava1中的作用是当其它排队的任务完成后,在当前线程排队开始执行接到的任务,有点像RxJava2中的Schedulers.single(),但也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。
(3)subscribeOn的使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
代码中添加了subscribeOn(Schedulers.io())这句代码,这样就可以从默认主线程切换到IO线程。
我们看一下打印结果
所以, 如果单纯用subscribeOn来控制线程,那么被观察者和观察者都会被切换到指定的线程。
如果添加多个, 比如
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.trampoline())
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
那么只有第一次调用subscribeOn有效果。
(4)observeOn的使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
打印效果
我们发现被观察者在主线程运行,观察者在子线程运行。
结论:结合(3)总结的结论是,subscribeOn可以控制被观察者和观察者的线程,observeOn仅可以控制观察者的线程。
(5)subscribeOn和observeOn结合使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.computation())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
打印效果如下:
这样观察者就从主线程切换到子线程了。
我们再来举一个稍微复杂的例子。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
执行效果如下:
我们发现
- 多次调用Schedulers.single(),都是在同一个线程执行。
- 多次调用Schedulers.computation()、Schedulers.newThread()、Schedulers.io()都会重新新建线程。
Schedulers.from()和AndroidSchedulers.mainThread()就不介绍了。