前言
自从去年8月底《浅谈RxJava与2.0的新特性》,已经过去快一年。笔者也没想到此文竟有读者等笔者填坑快一年了,不禁汗颜。所以笔者打算写关于一个 RxJava2 的系列文章,既为填坑,也为回报读者对我的支持。本文为第一篇。
读本系列文章,你可能有如下收获:
- 了解其设计原理,代码实现
- 掌握操作符的正确使用姿势,避免采坑
- 强化 Rx 编程思想,写出更 Rx 的代码
- 跟读精彩的源码,强化编程功底
废话不多说,进入正题。
Reactive Streams
之前在《浅谈RxJava与2.0的新特性》我们提到过, RxJava2 遵循 Reactive Streams 的编程规范, 或者更精确的说,是 RxJava2 中的 Flowable 相关的类。因此我们只分析 Flowable 相关的实现与使用,剩下的 Observable、 Completable、Single、 Maybe
这些不会再提及,相信读者朋友们可以举一反三。
Reactive Streams 中明确规范了如下4点:
- process a potentially unbounded number of elements
- in sequence,
- asynchronously passing elements between components,
- with mandatory non-blocking backpressure.
后面笔者会用 RS 代替全称。
请跟随本系列文章慢慢看 Flowable 是出色的完成上述的要求。
阅读源码的正确姿势
Rx2 在源码中加入了一些注解,这些注解对运行没有任何实际作用,仅仅是用作标识备注,有助于开发者了解某个操作符的正确使用姿势,同时也有利于阅读源码时整理思路。这些注解位于io.reactivex.annotations
包名下,这里着重介绍一个。
BackpressureSupport
BackpressureSupport 是用作标识这个操作符对背压的支持类型,有以下几种:
- PASS_THROUGH:表示这个操作符仅仅传递背压类型,不做任何改变。例如
defer
操作符,这个操作符支持的背压类型取决于Callable
产生的Publisher
。 - FULL:表示这个操作符支持完全的背压,协调上下游关系
- SPECIAL:表示这个操作符支持的背压类型由方法上的文档说明
- UNBOUNDED_IN:表示这个操作符会向上游请求 Long.MAX_VALUE,并协调下游
- ERROR:表示如果下游没有足够数量的request,上游发射了超额的数据,这个操作符会抛出一个
MissingBackpressureException
- NONE:表示不处理背压
上面这些字面解释看起来还是很绕的,尤其是对于没有阅读过相关源码的读者。我们也不必一次性全部弄明白,后续会慢慢讲清楚所有。
走进create源码
前文中有提到过,Rx2 收回了create
方法的权限,使开发者自定义的create
也能够正确的支持背压。而实现的方式就是通过额外提供一个BackpressureStrategy
参数。也因此,create
的方法注解中 BackpressureSupport
是 SPECIAL
。
FlowableCreate
抛开Rx2提供的 plugin 不谈,本质上就是用 create 传进来的2个参数创建了FlowableCreate
这个类。
根据传入的BackpressureStrategy
生成不同的Emitter
对象。并遵循一致的编程约定,先调用 onSubscribe,随后将Emitter
传递给FlowableOnSubscribe
用来发射数据。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
BackpressureStrategy
Rx2 大量的类通过继承AtomicLong
来表示计算个数与发射个数,请求一个+1,发射一个-1。并且在 Rx2 中 Long.MAX_VALUE 有特殊含义,表示无限的数据。即,如果 request(Long.MAX_VALUE)
,即使发射了数据也不会减少自身的数值。
这里所有的 Emitter 都继承了基类BaseEmitter
,并提供一些公共方法如setDisposable/setCancellable/requested/serialize
等。然后根据各自的背压策略,实现相应的逻辑,下面分别介绍。
MISSING
MISSING即没有背压,我们看 onNext 函数会发现,每调用一次就会传递给下游的 subscriber.onNext
,空指针则onError
。
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
actual.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
上面的代码是2.1.2版本的源码,笔者认为这里有一处 BUG 。即在自减的时候,没有检查 Long.MAX_VALUE 的情况,导致在request(Long.MAX_VALUE)
后,发射数据时依然会不断自减,这是与一致的设计思路相悖的。反观下面 DROP 与 BUFFER 相关的 Emitter 处理时,则直接调用了BackpressureHelper.produced(this, 1)
,在里面会有 Long.MAX_VALUE 的判断。
虽然有点小 BUG,但是实际中除了在requested()
函数中会出错外,不会影响正常的执行流。且一般开发者也不会使用Emitter.requested()
函数。
虽然 MISSING 不支持背压,但是没关系,我们可以通过操作符来弥补。
这些操作符结合使用 MISSING 的 create 方法,使得原本不支持背压的 Flowable 支持背压。
当然我们也大可不必这样麻烦,既然要使用 buffer、drop 或者 latest,使用下面的策略即可。除非我们需要那些操作符提供的额外功能。
ERROR
ERROR则和最开始的BackpressureSupport.ERROR
表现一致。
下面这块代码是NoOverflowBaseAsyncEmitter
,会被 ERROR 和 DROP 对应的Emitter
继承。逻辑也很简单,即请求数如果还大于0,则向下发射并将请求数减1,否则走onOverflow()
方法。
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
而 ERROR 对应的实现则很简单了,不在赘述。
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
DROP
DROP即直接丢弃超额的数据,体现在代码中就非常简单。
@Override
void onOverflow() {
// nothing to do
}
BUFFER 与 LATEST
这俩之所以放到了一起,是因为 BUFFER 与 LATEST 本质上都是缓存了数据,细节上的区别就是,BUFFER 是缓存了所有数据,而 LATEST 只保留了最近的一个 onNext 数据。
体现在代码中这两者最主要的区别就是一个用了队列来缓存,一个用了AtomicReference 来维持最后一个未被消费的数据。
就挑 BUFFER 来说, onNext 就是将数据扔进队列,而后尝试消费数据即调用drain()
。onError 与 onComplete 则是将 结束标识置为 true ,并保留异常,然后依然也是在drain()
中消费该消息。
在/onNext/onError/onComplete/onRequested时,都会调用drain()
来消费队列中的数据。
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t);
drain();
}
drain
中做的事就比较复杂了,为了保证线程安全,首先通过一个AtomicInteger
来确保只有一个线程可以进入for(;;)
循环。
在 for 循环中,不断的消费队列中的数据,如果队列为空则检查结束标识是否为 true,是的话则发射 onComplete 或者 onError 。
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这里请大家留意一个编程的套路,在绝大多数队列消费的场景里, Rx2 中都是使用了下面的方式。这也是我们可以积累使用的。通过这种方式可以保证for循环里的代码是单线程执行的,且如果执行期间有一次或多次新的调用drain()
,会导致重新走一遍包含注释处的代码,确保数据可以正确的消费发射。
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
for (;;) {
// 消费队列, 发射数据
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
小结
笔者在介绍过程中已经省略了很多细枝末节,不免显得知识有些分散,结合源码阅读效果更佳。没想到一个小小的 create 也包含这么多的玄机。
我相信通过阅读这篇文章,读者们写 create 的时候应该可以做到结合实际场景选择正确的BackpressureStrategy
。
有了 create 便从此开启 Rx2 万里长征第一步。下一篇,我们将会介绍 Rx2 的线程调度相关的操作符及其实现,敬请期待。