我所理解的RxJava——上手其实很简单(二)

前言

欢迎继续收看《我所理解的RxJava--上手其实很简单(二)》,上周出了第一篇,各位程序猿大大的反应还不错,生平第一篇博文能获得大家的肯定,内心肯定是非常开心的,也坚定了我继续写下去的信念,总之,先谢谢大家送出的鱼丸...哦不,是收藏。好了,题外话不多说,进入这一篇的主题,本文主要给大家补充一下上一篇遗留的Subject知识,没看过上一篇的同学、忘了上一章写什么的同学、还有其他同学,请进入时光机:我所理解的RxJava——上手其实很简单(一)温习一遍,俗话说,“书读百遍,奇异自见”,看多一遍是一遍,多多益善嘛。温习完的,请回来继续听讲。


关于Subject

关于Subject,官方文档的解释是这样的:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。从官方解释中,我提取出三个要点:

  1. 它可以充当Observable;
  2. 它可以充当Observer;
  3. 它是Observable和Observer之间的桥梁;

接下来对这三个要点解释一下,但在解释之前,要先介绍一下Subject的种类, Subject是一个抽象类,不能通过new来实例化Subject,所以Subject有四个实现类,分别为AsyncSubjectBehaviorSubject、PublishSubjectReplaySubject,每个实现类都有特定的“技能”,下面结合代码来介绍一下它们各自的“技能”。注意,所有的实现类都由create()方法实例化,无需new,所有的实现类调用onCompleted()onError(),它的Observer将不再接收数据;


Subject的分类解析

  • AsyncSubject
    Observer会接收AsyncSubject的```onComplete()``之前的最后一个数据,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:
        AsyncSubject<String> asyncSubject = AsyncSubject.create();
        asyncSubject.onNext("asyncSubject1");
        asyncSubject.onNext("asyncSubject2");
        asyncSubject.onNext("asyncSubject3");  
        asyncSubject.onCompleted();
        asyncSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

                LogUtil.log("asyncSubject onCompleted");  //输出 asyncSubject onCompleted
            }

            @Override
            public void onError(Throwable e) {

                LogUtil.log("asyncSubject onError");  //不输出(异常才会输出)
            }

            @Override
            public void onNext(String s) {

                LogUtil.log("asyncSubject:"+s);  //输出asyncSubject:asyncSubject3
            }
        });

以上代码,Observer只会接收asyncSubject的onCompleted()被调用前的最后一个数据,即“asyncSubject3”,如果不调用onCompleted(),Subscriber将不接收任何数据。

  • BehaviorSubject
    Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。(注意跟AsyncSubject的区别,AsyncSubject要手动调用onCompleted(),且它的Observer会接收到onCompleted()前发送的最后一个数据,之后不会再接收数据,而BehaviorSubject不需手动调用onCompleted(),它的Observer接收的是BehaviorSubject被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。)示例代码如下:
    BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
    behaviorSubject.onNext("behaviorSubject1");
    behaviorSubject.onNext("behaviorSubject2");
        behaviorSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

                LogUtil.log("behaviorSubject:complete");
            }

            @Override
            public void onError(Throwable e) {

                LogUtil.log("behaviorSubject:error");
            }

            @Override
            public void onNext(String s) {

                LogUtil.log("behaviorSubject:"+s);
            }
        });

        behaviorSubject.onNext("behaviorSubject3");
        behaviorSubject.onNext("behaviorSubject4");

以上代码,Observer会接收到behaviorSubject2、behaviorSubject3、behaviorSubject4,如果在behaviorSubject.subscribe()之前不发送behaviorSubject1、behaviorSubject2,则Observer会先接收到default,再接收behaviorSubject3、behaviorSubject4。

  • PublishSubject
    PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:
    PublishSubject<String> publishSubject = PublishSubject.create();
    publishSubject.onNext("publishSubject1");
    publishSubject.onNext("publishSubject2");
    publishSubject.subscribe(new Observer<String>() {
           @Override
           public void onCompleted() {
               
           }
    
           @Override
           public void onError(Throwable e) {
    
           }
    
           @Override
           public void onNext(String s) {
               LogUtil.log("publishSubject observer1:"+s);
           }
       });
    publishSubject.onNext("publishSubject3");
    publishSubject.onNext("publishSubject4");
    

以上代码,Observer只会接收到"behaviorSubject3"、"behaviorSubject4"。

  • ReplaySubject
    ReplaySubject会发射所有数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据。示例代码如下:
ReplaySubject<String>replaySubject = ReplaySubject.create(); //创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
//replaySubject = ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
//replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据 
 //replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());  //replaySubject被订阅前的前1秒内发送的数据才能被接收     
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
                LogUtil.log("replaySubject:" + s);
        }
    });
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");

以上代码,由于情况比较多,注释也已解释的相当清楚,就不对输出结果一一表述了,有疑问的自行copy代码去测试一下。至此,四种Subject类型已经介绍完毕,但是需要注意,如果你把 Subject 当作一个 Subscriber 使用,不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:

SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);

要点解答

接下来,我们继续前面提出的问题,为什么说Subject既可充当Observable,又可充当Observer,是它们两个之间的桥梁呢?经过前面的例子,也许有些人已经大概理解了,不理解的且听我细细道来。首先,从理论上讲,Subject继承了Observable,又实现了Observer接口,所以说它既是Observable又是Observer,完全合理。从实际应用上讲,Subject也能实现Observable和Observer相同的功能,口说无凭,我们还是通过代码来证实比较有说服力。

  • 创建Observable并发射数据:
    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("I'm Observable");
                subscriber.onCompleted();
            }
        });

用Subject实现为:

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();
  • 创建Observer订阅Observable并接收数据:
mObservable.subscribe(new Observer<String>() {
        @Override
         public void onCompleted() {
                
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
  
                //接收数据
        }
});

用Subject实现为:

     publishSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        });

也许有人会问,不是说Subject也可以作为Observer,不能把Subject当作Observer传入subscribe()中吗?回答是:当然可以!就象这样:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {

            subscriber.onNext("as Observer"); 
            subscriber.onCompleted();
        }
}).subscribe(publishSubject);

有没有发现问题?publishSubject没有重写onNext()方法啊,在哪接收的数据?这就是前面说的“桥梁”的问题了,尽管把Subject作为Observer传入subscribe(),但接收数据还是要通过Observer来接收,借用Subject来连接Observable和Observer,整体代码如下:

PublishSubject<String> publishSubject = PublishSubject.create();
     Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("as Bridge");
                subscriber.onCompleted();
            }
        }).subscribe(publishSubject);

        publishSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

                LogUtil.log("subject:"+s); //接收到 as Bridge
            }
        });
    ```
没错,这很桥梁!
***
##总结
关于Subject,到此就介绍完了。也许你会跟我一样困惑,为什么又要多个Subject出来,除了有几个特定功能之外,其他所有的一切,Observable和Observer也都有,而且写法上也没有原来的简便。确实如此,对于几个特定功能,我也还想不到有什么应用场景,至少我还没发现有什么场景必须得用Subject来实现不可,那么问题又来了,我为什么要花这么大篇幅来介绍Subject,理由有三。其一,既然官方推出Subject,必有其道理,还没遇到不代表以后不会遇到,更不能代表你不会遇到这样的应用场景;其二,“一千个读者有一千个哈姆雷特”,我所看到的并不是全部,也许你会发掘出更有意思的东西可不是?其三,我可不想当你看完我所有关于RxJava的文章,自信已上手RxJava,当有人跟你提起Subject的时候,你一脸茫然不知道Subject是什么东西,岂不哀哉?所以呢,介绍一下Subject还是很有意义的,最起码学了比没学好,“养兵千日用兵一时”,知识不嫌多,突然哪天就用上了呢。对于Subject的理解,有异议的欢迎底下评论,一起交流进步。下一篇文章,进入RxJava操作符的使用讲解。
***
本猿已转行,勿念。
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容