打造属于自己的RxBus

RxBus

通过RxJava实现Rxbus。

相信大家已经非常熟悉EventBus了。最近正在学习Rxjava,如果在项目中已经使用了Rxjava,使用RxBus来代替EventBus应该是不错的选择。

RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。

RxBus工作原理

直接看代码

Note that it is important to subscribe to the exact same rxBus instance that was used to post the events

采用单例模式来保证rxBus对象一致

public class RxBus {

    private static RxBus rxBus;
    private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

    private RxBus() {
    }

    public static RxBus getInstance() {
        if (rxBus == null) {
            synchronized (RxBus.class) {
                if (rxBus == null) {
                    rxBus = new RxBus();
                }
            }
        }
        return rxBus;
    }


    public void send(Object o) {
        _bus.onNext(o);
    }

    public Observable<Object> toObserverable() {
        return _bus;
    }
}

Activity中发送事件

public void sendTap(View view){   
         RxBus.getInstance().send(new TapEvent());
}

public void sendOther(View view){
        RxBus.getInstance().send(new OtherEvent());
}

Fragment中接收事件

 RxBus.getInstance().toObserverable()
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        if (o instanceof TapEvent) {
                            textView.setText("tap");
                        } else if (o instanceof OtherEvent) {
                            textView.setText("other");
                        }
                    }
                });

效果


效果图

以上就是使用Rxjava简单实现RxBus的功能,当然这还远远不够

RxBus升级

在具体使用过程中总会碰到各种各样的问题

场景1
我在上一个项目中实现了无限轮播的功能,并且希望轮播图在用户滑动、不可见、以及程序在后台休眠时都停止滚动,这时候就希望EventBus及时的传递这3种状态,为此我需要写slideEvent、visibleEvent、aliveEvent3个类,虽然他们都需要传递一个简单的Boolen值。

解决方案
创建一个Event“管家”
类似key-value的方式,每个事件都有自己的唯一的Code,接收事件时根据Code返回对应的content

public class Events<T> {

    //所有事件的CODE
    public static final int TAP = 1; //点击事件
    public static final int OTHER = 21; //其它事件
    
    //枚举
    @IntDef({TAP, OTHER})
    @Retention(RetentionPolicy.SOURCE)
    public @interface EventCode {}


    public @Events.EventCode int code;
    public T content;

    public static <O> Events<O> setContent(O t) {
        Events<O> events = new Events<>();
        events.content = t;
        return events;
    }

    public <T> T getContent() {
        return (T) content;
    }

}

场景2
怎么又内存泄漏了?

每个人在开发过程中,或多或少都会碰到内存泄漏的的问题,我一直有一个天真的想法,RxJava那么牛逼,是不是能无声无息地就能解决内存泄漏的问题了,答案是否定的。

我看了不少有关RxJava的文章,都会提到
一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

你可以

@Overrideprotected void onDestroy() { 
        super.onDestroy(); 
        if(!rxSubscription.isUnsubscribed()) {     
            rxSubscription.unsubscribe();
         }
}

又或者

使用CompositeSubscription把 Subscription 收集到一起,方便 Activity(基类) 销毁时取消订阅,防止内存泄漏。

前者可以在任一生命周期阶段取消订阅,缺点是每个acivity/fragment都要重写方法。

后者可以写在BaseActivity(大家都不会陌生),每个activity都能用,缺点是不够灵活。

以上两种方法似乎都欠缺点意思,所幸Rx家族“人丁兴旺“”,早已想好了解决方案
RxLifecycle

一、bindToLifecycle()方法
在子类使用Observable中的compose操作符,调用,完成Observable发布的事件和当前的组件绑定,实现生命周期同步。从而实现当前组件生命周期结束时,自动取消对Observable订阅。

 Observable.interval(1, TimeUnit.SECONDS)
        .compose(this.bindToLifecycle())
            .subscribe(new Action1<Long>() { 
                @Override
                public void call(Long num) {
                    Log.i(TAG, "  " +num);
                }
            });

二、bindUntilEvent() 方法
使用ActivityEvent类,其中的CREATE、START、 RESUME、PAUSE、STOP、 DESTROY分别对应生命周期内的方法。使用bindUntilEvent指定在哪个生命周期方法调用时取消订阅。

public enum ActivityEvent {

    CREATE,
    START,
    RESUME,
    PAUSE,
    STOP,
    DESTROY

}


public enum FragmentEvent {

    ATTACH,
    CREATE,
    CREATE_VIEW,
    START,
    RESUME,
    PAUSE,
    STOP,
    DESTROY_VIEW,
    DESTROY,
    DETACH

}

组装零件

public class RxBus {

    private static RxBus rxBus;
    private final Subject<Events<?>, Events<?>> _bus = new SerializedSubject<>(PublishSubject.<Events<?>>create());

    private RxBus(){}

    public static RxBus getInstance(){
        if (rxBus == null){
            synchronized (RxBus.class){
                if (rxBus == null){
                    rxBus = new RxBus();
                }
            }
        }
        return rxBus;
    }

    public void send(Events<?> o) {
        _bus.onNext(o);
    }

    public void send(@Events.EventCode int code, Object content){
        Events<Object> event = new Events<>();
        event.code = code;
        event.content = content;
        send(event);
    }

    public Observable<Events<?>> toObservable() {
        return _bus;
    }

    public static SubscriberBuilder with(FragmentLifecycleProvider provider){
        return new SubscriberBuilder(provider);
    }

    public static SubscriberBuilder with(ActivityLifecycleProvider provider){
        return new SubscriberBuilder(provider);
    }


    public static class SubscriberBuilder{

        private FragmentLifecycleProvider mFragLifecycleProvider;
        private ActivityLifecycleProvider mActLifecycleProvider;
        private FragmentEvent mFragmentEndEvent;
        private ActivityEvent mActivityEndEvent;
        private int event;
        private Action1<? super Events<?>> onNext;
        private Action1<Throwable> onError;

        public SubscriberBuilder(FragmentLifecycleProvider provider) {
            this.mFragLifecycleProvider = provider;
        }

        public SubscriberBuilder(ActivityLifecycleProvider provider){
            this.mActLifecycleProvider = provider;
        }

        public SubscriberBuilder setEvent(@Events.EventCode int event){
            this.event = event;
            return this;
        }

        public SubscriberBuilder setEndEvent(FragmentEvent event){
            this.mFragmentEndEvent = event;
            return this;
        }

        public SubscriberBuilder setEndEvent(ActivityEvent event){
            this.mActivityEndEvent = event;
            return this;
        }

        public SubscriberBuilder onNext(Action1<? super Events<?>> action){
            this.onNext = action;
            return this;
        }

        public SubscriberBuilder onError(Action1<Throwable> action){
            this.onError = action;
            return this;
        }


        public void create(){
            _create();
        }

        public Subscription _create(){
            if (mFragLifecycleProvider!=null){
                return RxBus.getInstance().toObservable()
                        .compose(mFragmentEndEvent == null ? mFragLifecycleProvider.bindToLifecycle() :mFragLifecycleProvider.<Events<?>>bindUntilEvent(mFragmentEndEvent)) // 绑定生命周期
                        .filter(new Func1<Events<?>, Boolean>() {
                            @Override
                            public Boolean call(Events<?> events) {
                                return events.code == event;
                            }
                        })   //过滤 根据code判断返回事件
                        .subscribe(onNext, onError == null ? new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                throwable.printStackTrace();
                            }
                        } : onError);
            }
            if (mActLifecycleProvider!=null){
                return RxBus.getInstance().toObservable()
                        .compose(mActivityEndEvent == null ? mActLifecycleProvider.bindToLifecycle() :mActLifecycleProvider.<Events<?>>bindUntilEvent(mActivityEndEvent))
                        .filter(new Func1<Events<?>, Boolean>() {
                            @Override
                            public Boolean call(Events<?> events) {
                                return events.code == event;
                            }
                        })
                        .subscribe(onNext, onError == null ? (Action1<Throwable>) new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                throwable.printStackTrace();
                            }
                        } : onError);
            }
            return null;
        }
    }
}

新BUS上路

依然使用前面的例子

Activity中发送事件

 public void sendTap(View view){
        RxBus.getInstance().send(Events.TAP, "Tap传了一个String");
    }

    public void sendOther(View view){
        RxBus.getInstance().send(Events.OTHER, null);
//        RxBus.getInstance().send(Events.OTHER, new OtherEvent("Cloud", 25));
    }

Fragment中接收事件

fragment需要继承RxLifecycle对应组件

public class BlankFragment extends RxFragment {}
 
  RxBus.with(this)
                .setEvent(Events.TAP)
//                .setEndEvent(FragmentEvent.DESTROY_VIEW) //不设置默认与fragment生命周期同步
                .onNext(new Action1<Events<?>>() {
                    @Override
                    public void call(Events<?> events) {
                        String content = events.getContent();
                        textView.setText(content);
                    }
                })
                .create();

        RxBus.with(this)
                .setEvent(Events.OTHER)
                .setEndEvent(FragmentEvent.DESTROY_VIEW) //不设置默认与fragment生命周期同步
                .onNext(new Action1<Events<?>>() {
                    @Override
                    public void call(Events<?> events) {
                        OtherEvent event = events.getContent();
                        textView.setText("Name: "  + event.getName() + ",Age: "+ event.getAge());
                    }
                })
                .onError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        textView.setText(throwable.toString());
                    }
                }) // 异常处理,默认捕获异常,不做处理,程序不会crash。
                .create();

效果

![normal.gif](http://upload-images.jianshu.io/upload_images/1896628-f4eb87351f734b10.gif?imageMogr2/auto-orient/strip)

完整代码,请移步

参考资料

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容