Rxjava2(二)、五种观察者模式创建及背压

Android进阶系列之第三方库知识点整理。

知识点总结,整理也是学习的过程,如有错误,欢迎批评指出。

上一篇:Rxjava2(一)、基础概念及使用

image

直接开整,上一篇基础概念里面说了,rxjava2 扩展于观察者模式,我们上篇的只是简单的介绍了用Observable来创建使用,其实rxjava2给我们提供了五种观察者模式的创建方式。

image

1、Observable 和 Observer

能够发射0或n个数据,并以成功或错误事件终止,在第一篇中已经举例说明了,这里就不再详细说明。

2、Flowable 和 Subscriber

能够发射0或n个数据,并以成功或错误事件终止。 支持背压,可以控制数据源发射的速度。

我们看到ObservableFlowable这两个的区别就是后者支持背压,那么何为背压?

2.1、什么是背压

背压是一种现象,简单来说就是在异步操作中,上游发送数据速度快于下游处理数据的速度,下游来不及处理,Buffer 溢出,导致事件阻塞,从而引起的各种问题,比如事件丢失,OOM等。

rxjava1中并不支持背压,当出现事件阻塞时候,会直接抛出 MissingBackpressureException 异常,但是在rxjava2中,提供了 Flowable 来创建被观察者,通过Flowable 来处理背压问题,我们可以简单通过demo分析。

[站外图片上传中...(image-7e5758-1577706002259)]

A:我们上游模拟循环发送数据。

B:线程切换,异步操作。

C:下游每隔一秒获取数据。

我们Observable 创建,来模拟了背压这个现象,我们在上游模拟无限循环的发送数据,下游每次都休眠一秒再获取数据,这样肯定会造成我们前面提的问题,就是上游发送太他丫的快了,下游根本处理不过来,我们先看结果。

image

看日志,打印结果停留在了13就没有继续打印了?同时可以看到程序已经崩了,是因为在rxjava2中,Observable并不支持背压操作,遇到背压问题,它并不会报错,也不会抛MissingBackpressureException 异常,但是内存会一直飙高,最后导致内存不足程序直接挂掉。

[站外图片上传中...(image-41c5f6-1577706002259)]

可以看到内存一直在往上飙,针对背压这种现象,rxjava2中提出用 Flowable 来处理。

下面由浅入深,慢慢揭开Flowable 的神秘面纱。

我们先用Flowable创建一个基本的demo:

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                emitter.onNext("事件一");
                LogUtil.d(TAG + "--subscribe  发送事件一");
                emitter.onNext("事件二");
                LogUtil.d(TAG + "--subscribe  发送事件二");
                emitter.onNext("事件三");
                LogUtil.d(TAG + "--subscribe  发送事件三");
                emitter.onNext("事件四");
                LogUtil.d(TAG + "--subscribe  发送事件四");
                emitter.onComplete();
                LogUtil.d(TAG + "--subscribe  发送完成");
            }
        }, BackpressureStrategy.ERROR) // 这里需要传入背压策略,跟线程池里面饱和策略类似,当缓存区存满时候采取的处理策略
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // 线程切换,异步操作
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        // 决定观察者能接收多少个事件,多余事件放入缓存区
                        // Flowable 默认缓存区大小为128,即最大能存放128个事件
                        s.request(3);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

可以看到Flowable创建和Observable基本差不多,只是在create方法中多传入BackpressureStrategy.ERROR 这么一个背压策略,这个后面会详讲。

onSubscribe 的回调中,参数变成了Subscription,我们可以通过这个参数,让观察者自己设置要接收多少个事件,如果发送的事件大于观察者设置接收的事件,多余事件将会存入Flowable缓存区中。

Flowable缓存区队列大小只能存放128个事件,如果超过,就会报异常。

结果:

[站外图片上传中...(image-af462-1577706002259)]

发送四个事件,观察者通过Subscription.request(3)设置只接收三个事件,所以下游只接收三个,剩下一个放入Flowable缓存区中。

如果我们观察者不设置Subscription.request(x),即不接收事件,被观察者仍然会发送事件,并存入缓存区中,观察者可以动态调用Subscription.request(x)方法来获取事件。

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                for (int x = 0; x <= 10; x++) {
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                    emitter.onNext(x + "事件");
                }
            }
        }, BackpressureStrategy.ERROR) 
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        subscription = s;
                        // s.request(3);  这里不指定观察者接收事件个数
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

动态获取

findViewById(R.id.bt_get_event).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (subscription != null) {
                    LogUtil.d(TAG + "--onClick");
                    subscription.request(4);
                }
            }
        });

可以看到我们观察者一开始并没有指定接收多少个事件,而是通过外接点击事件,来动态设置接收事件个数,我们看结果,当点击触发后,我们收到了最先存入队列的四个事件。

结果:

image

2.2、背压策略

我们前面提到,Flowable 默认的缓存区队列大小为128,即只能存放上游发送的128个事件,如果上游发送的事件超过128,就需要我们指定相应的背压策略来做不同的处理,BackpressureStrategy为我们提供了五种背压策略。

[站外图片上传中...(image-287d84-1577706002259)]

整理如下:

策略 作用
MISSING 当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,抛出异常MissingBackpressureException , 提示缓存区满了
ERROR 当缓存区大小存满(128)(默认缓存区大小128),被观察者仍然继续发送下一个事件时,直接抛出异常MissingBackpressureException
BUFFER 当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,缓存区大小设置无限大, 即被观察者可无限发送事件,但实际上是存放在缓存区
DROP 当缓存区大小存满,被观察者仍然继续发送下一个事件时, 超过缓存区大?。?28)的事件会被全部丢弃
LATEST 当缓存区大小存满,被观察者仍然继续发送下一个事件时,只保存最新/最后发送的事件, 其他超过缓存区大?。?28)的事件会被全部丢弃

2.2.1、BackpressureStrategy.MISSING

当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,抛出异常MissingBackpressureException , 提示缓存区满了

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 发送129个事件,模拟超出缓存区
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                }
            }
        }, BackpressureStrategy.MISSING) // 使用BackpressureStrategy.MISSING背压策略
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

我们使用BackpressureStrategy.MISSING背压策略,观察者接收request(Integer.MAX_VALUE),此值也为推荐值。

结果:

image

我们看到,当发送了128个事件后,再发送第129个事件时候,抛了MissingBackpressureException异常,而且我们设置了观察者接收也未接收到数据,说明是先存入缓存区队列,再发送,当缓存区中抛异常后,就停止了onNext()事件,我们可以验证一下,当设置被观察者发送128事件。

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // *******  发送128个事件  ********
                for (int x = 0; x < 128; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                }
            }
        }, BackpressureStrategy.MISSING)
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

就是在上面demo的基础上,改了发送的事件个数,上游发送128个事件,刚好为缓存区大小,并不抛异常。

结果:

image

我们看到程序没有抛异常,并且正常打印了缓存区中的128个数据(从0开始),可以印证两点

1、缓存区大小确实为128

2、先存入缓存区后再获?。ㄈ绻斐?,onNext直接不调用)

2.2.2、BackpressureStrategy.ERROR

当缓存区大小存满(128)(默认缓存区大小128),被观察者仍然继续发送下一个事件时,直接抛出异常MissingBackpressureException

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 发送129个事件,模拟超出缓存区
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                }
            }
        }, BackpressureStrategy.ERROR)
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.ERROR 背压策略

结果:

[站外图片上传中...(image-b907e7-1577706002259)]

跟Missing一样,直接抛了MissingBackpressureException异常且下游未接收到数据,同理,如果上游发送数据小于等于128,正常发送和接收。

2.2.3、BackpressureStrategy.BUFFER

当缓存区大小存满(128),被观察者仍然继续发送下一个事件时,缓存区大小设置无限大, 即被观察者可无限发送事件,但实际上是存放在缓存区。

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 发送129个事件,模拟超出缓存区
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                }
            }
        }, BackpressureStrategy.BUFFER)
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.BUFFER 背压策略

更改缓存区大小,不做限制。

结果:

image

可以看到,我们发送的129个事件全部发送且接收到了。

2.2.4、BackpressureStrategy.DROP

当缓存区大小存满,被观察者仍然继续发送下一个事件时, 超过缓存区大?。?28)的事件会被全部丢弃

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 发送129个事件,模拟超出缓存区
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                }
            }
        }, BackpressureStrategy.DROP)
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.DROP 背压策略

丢掉大于缓存区的事件。

结果:

[站外图片上传中...(image-ea3cc7-1577706002259)]

结果很明了,并没有抛异常同时也正常打印了,但是超过缓存区的那个事件被抛弃,并没有获取到。

2.2.5、BackpressureStrategy.LATEST

当缓存区大小存满,被观察者仍然继续发送下一个事件时,只保存最新/最后发送的事件, 其他超过缓存区大小(128)的事件会被全部丢弃

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 发送150个事件
                for (int x = 0; x < 150; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  发送了" + x + "个事件");
                }
            }
        }, BackpressureStrategy.LATEST)
                // 线程切换,异步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.LATEST 背压策略

发送了150个事件

当超出128时,会保存最新的一个事件,即会接收129个事件。

结果:

image

我们可以看到,观察者端接收到129个数据,分别为缓存区内数据,加上最新/最后一条数据,中间数据均被丢弃。

2.3、同步情况下Flowable

前面说过,背压前提是异步操作下,在同步下,我们并不会有背压一说,因为在同一个线程,发送数据后总是要等下游处理了才会发送第二条数据,不会存在缓冲区,如下:

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                LogUtil.d(TAG + "--subscribe  发送事件一");
                emitter.onNext("事件一");
                LogUtil.d(TAG + "--subscribe  发送事件二");
                emitter.onNext("事件二");
                LogUtil.d(TAG + "--subscribe  发送事件三");
                emitter.onNext("事件三");
                LogUtil.d(TAG + "--subscribe  发送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                LogUtil.d(TAG + "--onSubscribe");
                s.request(3);
            }

            @Override
            public void onNext(String s) {
                LogUtil.d(TAG + "--onNext  接收到:" + s);
            }

            @Override
            public void onError(Throwable t) {
                LogUtil.d(TAG + "--onError  error=" + t);
            }

            @Override
            public void onComplete() {
                LogUtil.d(TAG + "--onComplete");
            }
        });

结果:

image

可以看到,事件都是顺序执行,发送一条接收一条,然后再执行下一条。

但是,我们可能会遇到这个一个情况,当上游发送了四条数据,但是下游只接收三条?我们改一下demo如下:

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                LogUtil.d(TAG + "--subscribe  发送事件一");
                emitter.onNext("事件一");
                LogUtil.d(TAG + "--subscribe  发送事件二");
                emitter.onNext("事件二");
                LogUtil.d(TAG + "--subscribe  发送事件三");
                emitter.onNext("事件三");
                LogUtil.d(TAG + "--subscribe  发送事件四");
                emitter.onNext("事件四");
                LogUtil.d(TAG + "--subscribe  发送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                LogUtil.d(TAG + "--onSubscribe");
                s.request(3);

            }

            @Override
            public void onNext(String s) {
                LogUtil.d(TAG + "--onNext  接收到:" + s);
            }

            @Override
            public void onError(Throwable t) {
                LogUtil.d(TAG + "--onError  error=" + t);
            }

            @Override
            public void onComplete() {
                LogUtil.d(TAG + "--onComplete");
            }
        });

可以看到,被观察者发送了四个事件,但是观察者只接收了三条。

结果:

[站外图片上传中...(image-2790ee-1577706002259)]

可以看到,同样抛了MissingBackpressureException异常

这里可以使用BUFFER的背压策略来处理,但是我们为了说明观察者反向控制被观察者,我们采用如下方案:

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 通过emitter.requested()获取观察者设置的接收的事件数目
                long requested = emitter.requested();
                LogUtil.d(TAG + "--subscribe 观察者设置接收的事件数目:" + requested);

                for (int x = 0; x < requested; x++) {
                    LogUtil.d(TAG + "--subscribe  发送事件" + x);
                    emitter.onNext("发送事件" + x);
                }
                LogUtil.d(TAG + "--subscribe  发送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                LogUtil.d(TAG + "--onSubscribe");
                // 设置观察者接收事件数目为3
                s.request(3);

            }

            @Override
            public void onNext(String s) {
                LogUtil.d(TAG + "--onNext  接收到:" + s);
            }

            @Override
            public void onError(Throwable t) {
                LogUtil.e(TAG + "--onError  error=" + t);
            }

            @Override
            public void onComplete() {
                LogUtil.d(TAG + "--onComplete");
            }
        });

我们在subscribe中通过emitter.requested()获取观察者中设置的接收事件数目,来动态的发送数据,这样就避免了上下游数据不同步问题。

结果:

image

2.4、使用操作符时背压处理

我们前面都是通过create来创建Flowable,可以在Create第二个参数中传入相应的背压策略,Flowable所有的操作符都支持背压,但是通过操作符创建的背压策略默认为BackpressureStrategy.ERROR,我们可以通过

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

三种方式来指定相应的背压策略。

        Flowable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        subscription = s;
                        s.request(Long.MAX_VALUE); //默认可以接收Long.MAX_VALUE个事件
                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtil.i(TAG + "--onNext  aLong=" + aLong);
                        try {
                            // 延时一秒接收
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.e(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.i(TAG + "--onComplete");
                    }
                });

这里我们通过 interval来创建Flowable,可以看到下游每一毫秒发送一条数据,下游一秒处理一条,上游明显快于下游,处理不过来数据放入缓存池中,当缓存池中队列满时,就会抛异常,因为其默认的背压策略为BackpressureStrategy.ERROR

结果:

image

我们可以通过onBackpressureXXX其指定相应的背压策略。

image

结果:

[站外图片上传中...(image-b45029-1577706002259)]

当我们指定背压策略为BUFFER后,可以看到并没有异常抛出,程序一直在打印输出。

3、Single和SingleObserver

只发射单个数据或错误事件。

        Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {
                // 只能发送onSuccess或者onError,发射多条数据,只接受第一条
                emitter.onSuccess("Success");
                emitter.onError(new NullPointerException(""));
            }
        }).subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtil.d(TAG + "--onSubscribe");
            }

            @Override
            public void onSuccess(String s) {
                LogUtil.d(TAG + "--onSuccess  s=" + s);
            }

            @Override
            public void onError(Throwable e) {
                LogUtil.e(TAG + "--onError  error=" + e.getMessage());
            }
        });

SingleEmitter发射器只能发送一条onSuccess或者onError数据,如果发射器发射多条数据,观察者只能接收到第一条数据。

结果:

image

4、Completable和CompletableObserver

不发射数据,只处理 onComplete 和 onError 事件。

[图片上传失败...(image-34cb09-1577706002259)]

方法onCompleteonError只可调用一个,同时调用,第一个生效。

5、Maybe和MaybeObserver

能够发射0或者1个数据,要么成功,要么失败。有点类似于Optional。

[站外图片上传中...(image-63ca30-1577706002259)]

onSuccess方法一次订阅只能发送一次。

方法onCompleteonError只可调用一个,同时调用,第一个生效。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351