大白话讲解RxJava原理

本文首发于我的csdn博客:https://blog.csdn.net/sinat_23092639/article/details/104356256

RxJava全局观赏

RxJava已经诞生了好多年了,对应变化风云诡谲的移动互联网行业来说,已经属于老框架了。虽然学习成本不低,但是熟练之后真叫人打从心里说声爽。那它到底爽在哪里呢,我个人总结为以下几点:

1.不管复杂的业务也可以用一条链连到底,将每个逻辑操作步骤划分到每一个操作符中
2.线程切换一个操作符搞定,完全省去了繁琐的回调,多次线程切换从代码的角度看流程依然十分直观
3.多条业务流程可以拼接一起,多线程可以合并操作。

简单一句就是:删繁就简三秋树。当然也有人反映不好用,这种也要根据不同的业务技术场景做取舍,如人饮水冷暖自知,见仁见智,这里就不赘述。

RxJava原理流程总览

在为RxJava删繁就简的能力惊叹之余,好奇的我们肯定控制不住探索其原理的欲望?!拔ü词讲僮鞣涂梢砸宦纷叩降??为啥线程可以链式切换呢?等等...”

我最近春节在家,终于有机会可以好好系统探索下RxJava的原理源码,下载了RxJava1.x的源码仔细斟酌一番,拨开代码的重重迷雾,慢慢的抓住了它简约而不简单的设计原理。

为什么说是简约而不简单呢?简约是因为它 的原理并不复杂,不高深,不简单是因为它能把一切复杂都隐藏在流畅的链式中。

话不多说,进入主题。

经过分析,我觉得用手机包装流水线来形容RxJava总的工作流程还是比较恰当的(大家觉得有不妥的欢迎指出讨论~):
1.先搭建生产流水线
2.启动流水线包装
3.用户一层层拆开包装最后拿到手机(最后的结果)。

这里各个比喻对应的代码对象:

Observable:流水线的某一道工序
OnSubscribe:一道工序中的工人
OnSubscribe的call方法:包装Subscriber
subscribe方法:启动流水线
Subscriber:一层包装盒
Subscriber的onNext:用户拆开包装

具体阐述下以上比喻的意思:

1.先搭建生产流水线

其实大部分操作符,都是新建一个Observable对象,然后将上游的Observable对象包装起来,传入一个新的OnSubscribe,比如:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
        return unsafeCreate(new OnSubscribeFilter<T>(this, predicate));
    }

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }


 public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }

最后都是调用了create方法创建Observable,把当前Observable传入给新的Observable持有,以保持链式(有点类似链表持有上一个节点的指针)。为什么要这样呢,因为RxJava是的链式是基于代理模式做的,也就是说基于一层一层Observable的包装。

那包装的是什么呢?就是OnSubscribe,那OnSubscribe包装的意义是什么呢?其实就是包装如何包装Subscriber的逻辑。

比如map,传入的是OnSubscribeMap(OnSubscribe的基类),它的call代码如下,

 @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

当map这个操作符创建出来的Observable被调用subscribe被调用的时候,就会该OnSubscribeMap的call方法,可以看代码发现这里创建了一个MapSubscriber对象,然后调用上游的Observable的unsafeSubscribe方法,传入该MapSubscriber对象作为参数。

所以当你开心地用RxJava一个个操作符把链写得老长的时候,里面的逻辑就是不断一层层包装Observable,每个Observable持有一个自己的OnSubscribe,具体类型由对应的操作符确定。

这就是我说的第一个流程搭建流水线,总的来说就是从上往下不断创建Observable,并连成链,即后一个Observable持有上游Observable的引用。

Observable之所以说是流水线的某一道工序,是因为它是这条链最基本的串联元素,而OnSubscribe之所以说是一道工序中的工人,是因为它决定了Subscriber是如何被包装的。

2.启动流水线包装

启动的开关正是链尾的subscribe方法??聪翺bservable的subscribe方法:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

subscribe(Subscriber<? super T> subscriber, Observable<T> observable)方法,方法比较长,最重要的就是

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

RxJava1.x中,RxJavaHooks.onObservableStart其实没有做什么操作,返回的就是原来的observable对象的onSubscribe,所以这里就是调用observable对象的onSubscribe的call方法,传入subscriber对象。

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }

OnSubscribe是一个继承Action1的接口,Action1是一个只有call方法的接口,所以这里call的逻辑由具体的OnSubscribe对象确定。

还记得还是那个面说的map操作生成的OnSubscribeMap对象的call方法逻辑么?它的call方法中创建了一个MapSubscriber对象,然后调用上游的Observable的unsafeSubscribe方法,并传入该MapSubscriber对象作为参数。

这里要注意的是,在创建了一个MapSubscriber对象的时候,会传入当前Observable调用的subscribe方法的参数Subscriber对象,保存该对象的引用actual ,以保持链式:

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

所以假如链尾的Observable是map操作符创建的,则subscribe执行的时候,会执行Observable对象中的OnSubscribeMap对象的call方法,生成一个MapSubscriber对象并持有我们代码中链的最后传入的 Subscriber对象,然后让上游的Observable对象调用subscribe方法,并传入这个MapSubscriber对象。所以这里就是从下往上递归调用Observable对象的subscribe方法,从而生成一条Subscriber对象的链(也可以理解为一层层包装)。

在这里,经过subscribe方法的启动,已经开始加工包装,最后生产出了一条Subscriber对象的链,即我们的手机包装盒。

3.用户拆开手机包装盒

这个流程,可以用杨宗纬的《洋葱》一段经典歌词来阐述:“一层一层一层地剥开我的心~~”

上一步操作从下到上生成了Subscriber对象的链,链的尾部就是最上游的Observable中的:

Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })

这里第二行的的t: Subscriber<in Int>,现在第四行调用了 t.onNext(1),又以之前的map操作符生成的MapSubscriber对象为例:

public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

这里先使用mapper.call(t)对传进去的Subscriber对象进行了变换,即map操作中指定的变换方法,这个下一节再谈。先注意这里最后调用了 actual.onNext(result);,而actual就是Subscriber链的下一个Subscriber对象,而除了map以外,大部分的Subscriber对象的onNext方法也有这样的逻辑,所以可以知道,这里Subscriber链在递归调用,也可以看作一层一层一层地打开,就仿佛是拆开手机包装盒。

流程总结

看前面的叙述各位可能还是有点雾里看花,总结一下:前面三个小节各对应一个流程,从RxJava调用代码来说,就是先从上到下把各个变换的Observable连成链(拼装流水线)然后在最后subscribe的时候,又从下到上通过每个Observable的OnSubscribe从最下的Subscriber对象开始连成链(流水线开始工作包装Subscriber),直到顶端,当顶端的Subscriber对象调用了onNext方法的时候,又从上往下调用Subscriber链的onNext(用户一层层拆开包装盒),里面执行了每个操作的变换逻辑。

举个例子进一步说明以上流程:

 Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
            .map(object : Func1<Int, String> {

                override fun call(t: Int): String {
                    Log.d(TAG, Thread.currentThread().name)
                    return t.toString()
                }
            })
            .map(object : Func1<String, Book> {

                override fun call(t: String): Book {
                    Log.d(TAG, Thread.currentThread().name)
                    return Book(t)
                }

            })

            .subscribe(object : Subscriber<Book>() {
                override fun onStart() {

                }

                override fun onNext(t: Book) {
                    Log.d(TAG, Thread.currentThread().name)
                    Log.d(TAG, t.toString())
                }

                override fun onComplete() {
                }

                override fun onError(t: Throwable) {
                    Log.d(TAG, t.message)
                }
            })

为了简单,这里只使用了map操作符。

以下是一个简单的流程图:

在这里插入图片描述

RxJava操作符原理解析

如果上面的总流程分析能理解的话,那么下面的操作符的理解就不难了。

普通的变换操作符

这里举map的例子。这里的变换处于从上往下递归执行Subscriber链onNext阶段(用户拆手机包装盒)
前面提到map中生成的MapSubscriber对象的onNext方法:

public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            //调用下游的Subscriber的onNext方法
            actual.onNext(result);
        }

注意到第四行代码 result = mapper.call(t);,这里的mapper其实就是我们写的map操作的变换方法:

.map(object : Func1<String, Book> {

                override fun call(t: String): Book {
                    Log.d(TAG, Thread.currentThread().name)
                    return Book(t)
                }

            })

这里面的Func1回调接口,所以经过这样call方法的调用,就实现了map的操作变换,然后执行 actual.onNext(result);,即将变换后的结果交给下游的Subscriber的onNext方法。

如果理解了上面的流程图,是不是理解map易如反掌呢?

线程切换操作符

线程切换主要两个操作符:subscribeOn和observeOn

线程切换是我觉得RxJava最牛逼的地方,不过了解了原理之后觉得也不复杂高深,主要还是在上面的总流程中的对应节点使用了常见的切换线程方式。

subscribeOn

作用:
将subscribe Observer的执行放在对应的线程。

subscribeOn最终会执行到:

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }

注意最后执行了:

return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));

根据前面的分析,这里就是创建新的Observable对象,并传入一个OnSubscribe实例对象,这里是OperatorSubscribeOn对象。

根据上面的说明,这里要看call方法:

public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();

        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
        subscriber.add(parent);
        subscriber.add(inner);

        inner.schedule(parent);
    }

可以看到第四行SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);,所以它创建的就是Subscriber对象就是SubscribeOnSubscriber,注意这里第二行final Worker inner = scheduler.createWorker();和最后一行 inner.schedule(parent);,

这里的方法调用栈比较长就不赘述,直接说下,这里worker里面就是执行线程切换的,里面封装线程池或者Handler,
通过schedule方法就可以将SubscribeOnSubscriber包装成一个Runnable放入线程池中执行,执行的方法是SubscribeOnSubscriber的call方法。

而SubscribeOnSubscriber的call方法:

public void call() {
            Observable<T> src = source;
            source = null;
            t = Thread.currentThread();
            src.unsafeSubscribe(this);
        }

和其他的Subscriber一样,也是传入上游Observable的subscribe方法中。

回忆上面讲的总流程,在第二个流程从下往上包装Subscriber链(加工包装)的时候,subscribeOn就是将从它当前这个节点开始将后面的一系列的Subscriber的成链以及从上往下执行各个Subscriber对象的onNext放到指定的线程执行。

常见的一种描述subscribeOn作用的说法:“将该subscribeOn语句的上游放在对应的线程中”,其实并不准确,因为如果只使用了subscribeOn而没有使用observeOn的话,整条链的变换过程都会执行在subscribeOn指定的线程的。RxJava官方的解释才是准确的

Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.

在刚才的示例代码中加入subscribeOn:

Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
            .map(object : Func1<Int, String> {

                override fun call(t: Int): String {
                    Log.d(TAG, Thread.currentThread().name)
                    return t.toString()
                }
            })
            //这里切换线程
            .subscribeOn(Schedulers.io())
            .map(object : Func1<String, Book> {

                override fun call(t: String): Book {
                    Log.d(TAG, Thread.currentThread().name)
                    return Book(t)
                }

            })

            .subscribe(object : Subscriber<Book>() {
                override fun onStart() {

                }

                override fun onNext(t: Book) {
                    Log.d(TAG, Thread.currentThread().name)
                    Log.d(TAG, t.toString())
                }

                override fun onComplete() {
                }

                override fun onError(t: Throwable) {
                    Log.d(TAG, t.message)
                }
            })

用刚才的流程图来表示的话,subscribeOn切换线程差不多是这样子的:

在这里插入图片描述

红色部分为放入指定线程的逻辑。

observeOn

observeOn最终会走到:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

这里使用了lift方法:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

这里和map本质还是一样的,创建一个新的额Observable并传入一个新的OnSubscribe对象(OnSubscribeLift),那主要就是要看这个OnSubscribeLift的call做了什么:

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);

call最主要的就是这几行,和map基本差不多,就是使用operator对传入从下游传入的Subscribeder进行转换,所以关键看OperatorObserveOn的call做了什么转换:

 ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;

主要看这里,OperatorObserveOn中创建了一个ObserveOnSubscriber,最后返回。

注意这里和OperatorSubscribeOn的不同,OperatorSubscribeOn是在call方法就把新建的Subscriber对象包装为Runnbale放入线程池中执行,将上游Observable对他的subscribe调用放到了指定线程。

而OperatorObserveOn是将ObserveOnSubscriber对象作为参数传入了上游的OnSubscribe的call方法,然后整个从下往上的包装Subscribe还是在原来的线程中执行,那这里关键点就是看ObserveOnSubscriber的onNext做了什么操作:

if (!queue.offer(NotificationLite.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();

重点就是这几行。第一行的t是onNext返回的Subscriber对象,NotificationLite.next这里正常情况下返回的还是t,而queue是一个队列,这里将t入列,然后执行了schedule(),该方法是将当前的ObserveOnSubscriber对象包装为Runnable,放入线程池中,然后在指定线程执行其call方法主要代码如下:

...
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
...
for (;;) {
...
                    Object v = q.poll();
                    boolean empty = v == null;

                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    localChild.onNext(NotificationLite.<T>getValue(v));
...
}

可以看到这里就是循环从队列中取出元素,然后再传入下游的Subscriber的onNext方法。

总结observeOn的操作:

在从下往上包装Subscriber链的时候(用户拆开手机包装盒),在调用observeOn的地方插入一个ObserveOnSubscriber对象,该对象可以在之后从上往下递归调用Subscriber链的时候,将该ObserveOnSubscriber对象之后的所有的onNext方法放到指定线程中执行。

现在在实例中添加observeOn:

Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
            .map(object : Func1<Int, String> {

                override fun call(t: Int): String {
                    Log.d(TAG, Thread.currentThread().name)
                    return t.toString()
                }
            })
            .subscribeOn(Schedulers.io())
            //将当前调用之后的onNext放入指定线程
            .observeOn(Schedulers.main())
            .map(object : Func1<String, Book> {

                override fun call(t: String): Book {
                    Log.d(TAG, Thread.currentThread().name)
                    return Book(t)
                }

            })

            .subscribe(object : Subscriber<Book>() {
                override fun onStart() {

                }

                override fun onNext(t: Book) {
                    Log.d(TAG, Thread.currentThread().name)
                    Log.d(TAG, t.toString())
                }

                override fun onComplete() {
                }

                override fun onError(t: Throwable) {
                    Log.d(TAG, t.message)
                }
            })

用流程图来表示:

在这里插入图片描述

红色为subscribeOn指定线程执行部分,绿色为observeOn指定线程执行部分。

动手实战

纸上得来终觉浅,绝知此事要躬行。为了加强理解,我根据RxJava源码自己写了一个demo级别的RxJava源码,使用Kotlin写的,简称RxKotlin(当然现在确实已经有知名的RxKotlin开源库了~) ,流程和RxJava一致,并且有简单的操作符(当然细节就不一样啦),各个类命名也和RxJava保持一致。如果各位觉得RxJava的源码不好理解,也可以参考我的demo。

为什么要看RxJava的源码呢?除了满足自己的探索欲望之外,通过学习RxJava源码,就可以学习到如何运用RxJava的设计思想,通过封装代码编写自己的响应式编程框架,以后我们就可以写出自己的RxPay、RxLogin之类的了。

Github地址:https://github.com/yishuinanfeng/RxKotlinSample

彩蛋:

最后留个小习题,通过本文的学习,应该就可以能回答这个问题了:

如果一次RxJava链式调用中多次使用subscribeOn,为什么只有第一个subscribeOn生效呢呢?

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容

  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 677评论 0 2
  • 来自于:CSDNblog.csdn.net/caihongdao123/article/details/51897...
    于加泽阅读 1,275评论 0 5
  • 在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github: https://...
    松江野人阅读 5,886评论 0 1
  • 星期天,我和爸爸一起回老家帮爷爷收麦子。金黄色麦田一望无际,几台大型联合收割机来回穿梭忙碌,一车车的麦粒被运走,天...
    美人如玉剑如虹_a962阅读 84评论 0 0
  • 香帅讲过中国的股市,大致是中国股民为国企输血,虽然多次改革,股民还是没能摆脱被割的命运,原因是我们的中央货币财政体...
    时汝佳阅读 170评论 0 0