rxjava2基本元素源码分析

无背压

代码示例

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        LogUtils.loge("Observable onSubscribe subscribe...");
        if (!emitter.isDisposed()) {
            emitter.onNext("test1");
            emitter.onComplete();
        }
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        LogUtils.loge("Observer onSubscribe ...");
    }

    @Override
    public void onNext(String s) {
        LogUtils.loge("Observer onNext str = " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        LogUtils.loge("Observer onComplete ...");
    }
});

基本元素

Observable

  1. 观察得到的-被观察者,不支持背压
  2. 通过Observable创建一个可观察的序列(create)
  3. 通过subscribe去注册一个观察者

Observer

  1. 用于接收数据----观察者
  2. 作为Observable的subscribe的方法的参数

Disposable

  1. 和Rxjava1的Susbscription的作用相当
  2. 用于取消订阅和获取当前的订阅状态

ObservableOnSubscribe

  1. 当订阅时会触发此接口调用
  2. 在Observable内部,实际作用是向观察者发射数据

Emitter

  1. 一个发送数据的接口,和Observer的方法类似
  2. 本质是对Observer和Subscriber的包装

流程分解

io.reactivex.Observable#create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

io.reactivex.plugins.RxJavaPlugins#onAssembly(io.reactivex.Observable<T>)

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    // onObservableAssembly为里为空,所以f为空。返回的是我们传入的observable
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

io.reactivex.Observable#subscribe(io.reactivex.Observer<? super T>)

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 这里实际上是调用的是ObservableCreate中的subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
    } catch (Throwable e) {
    }
}

io.reactivex.plugins.RxJavaPlugins#onSubscribe(io.reactivex.Observable<T>, io.reactivex.Observer<? super T>)####

public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    // onObservableSubscribe,所以f为空。返回的是我们传入的observer
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}

io.reactivex.internal.operators.observable.ObservableCreate

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 构建Emitter
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 调用observer的onSubscribe,并传入创建的Emitter
    observer.onSubscribe(parent);

    try {
        // 调用subscribe方法
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

@Override
public boolean isDisposed() {
    return emitter.isDisposed();
}

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

io.reactivex.internal.disposables.DisposableHelper 单例类

public enum DisposableHelper implements Disposable {

    DISPOSED
    ;

    public static boolean isDisposed(Disposable d) {
        // 判断需要dispose的对象是否是已经dispose的
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        / 取到当前的Disposable的对象
        Disposable current = field.get();
        // 得到已经disposed的对象
        Disposable d = DISPOSED;
        if (current != d) {
            // 将AtomicReference中Disposable标识为disposed状态
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

    @Override
    public boolean isDisposed() {
        return true;
    }
}

有背压

代码示例

    Flowable.create((FlowableOnSubscribe<String>) emitter -> {
        LogUtils.loge("FlowableOnSubscribe subscribe");
        if (!emitter.isCancelled()) {
            emitter.onNext("test11");
            emitter.onComplete();
        }
    }, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            // 需要主动发起请求否则会请求不到数据
            s.request(Integer.MAX_VALUE);
            LogUtils.loge("Subscriber onSubscribe");
        }

        @Override
        public void onNext(String s) {
            LogUtils.loge("Subscriber onNext s = " + s);
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {
            LogUtils.loge("Subscriber onComplete");
        }
    });

基本元素

Flowable

  1. 易流动的--------被观察者,支持背压
  2. 通过Flowable创建一个可观察的序列(create方法)
  3. 通过subscribe去注册一个观察者

Subscriber

  1. 一个单独接口,和Observer方法类似
  2. 作为Flowable的subscribe方法的一个参数

Subscription

  1. 订阅,和Rxjava1有所不同
  2. 支持背压,有用于背压的request方法

FlowableOnSubscribe

  1. 当订阅时会触发此接口调用
  2. 在Flowable内部,实际作用是向观察者发射数据

OnSubscribe

Emitter

  1. 一个发射数据的接口,和Observer的方法类似
  2. 本质是对Observer和Subscriber的包装

流程分解

io.reactivex.Flowable#create

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    // 这里返回的就是FlowableCreate
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}

io.reactivex.Flowable#subscribe(org.reactivestreams.Subscriber<? super T>)

public final void subscribe(Subscriber<? super T> s) {
    subscribe(new StrictSubscriber<T>(s));
}

io.reactivex.Flowable#subscribe(io.reactivex.FlowableSubscriber<? super T>)

public final void subscribe(FlowableSubscriber<? super T> s) {
    Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
    // 调用的是FlowableCreate的subscribeActual方法
    subscribeActual(z);
}

io.reactivex.internal.operators.flowable.FlowableCreate

@Override
public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;
    // 根据背压策略构建emitter
    switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
    }
    // 调用   Subscriber的onSubscribe
    t.onSubscribe(emitter);
    try {
        // 调用subscribe方法
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}

io.reactivex.internal.operators.flowable.FlowableCreate#DropAsyncEmitter

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

    private static final long serialVersionUID = 8360058422307496563L;

    DropAsyncEmitter(Subscriber<? super T> downstream) {
        super(downstream);
    }
}

io.reactivex.internal.operators.flowable.FlowableCreate#NoOverflowBaseAsyncEmitter

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 4127754106204442833L;

    NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
        super(downstream);
    }

    @Override
    public final void onNext(T t) {
        if (isCancelled()) {
            return;
        }
        
        // get的值不为0才会调用onNext方法
        // 只有调用了request(n)的时候,这里才不会为0
        if (get() != 0) {
            downstream.onNext(t);
            BackpressureHelper.produced(this, 1);
        } else {
            onOverflow();
        }
    }

    abstract void onOverflow();
}

io.reactivex.internal.operators.flowable.FlowableCreate#BaseEmitter

abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
    private static final long serialVersionUID = 7326289992464377023L;

    final Subscriber<? super T> downstream;

    final SequentialDisposable serial;

    BaseEmitter(Subscriber<? super T> downstream) {
        this.downstream = downstream;
        this.serial = new SequentialDisposable();
    }

    @Override
    public final void cancel() {
        serial.dispose();
        onUnsubscribed();
    }

    @Override
    public final boolean isCancelled() {
        return serial.isDisposed();
    }

    @Override
    public final void request(long n) {
        if (SubscriptionHelper.validate(n)) {
            BackpressureHelper.add(this, n);
        }
    }
}

io.reactivex.internal.subscriptions.SubscriptionHelper#validate(long)

public static boolean validate(long n) {
    /* 如果传入的值小于等于0的时候,就直接返回了, 这就是为啥不调用这个request方法,onNext方法是不走的
    */
    if (n <= 0) {
        RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n));
        return false;
    }
    return true;
}

io.reactivex.internal.util.BackpressureHelper#add

public static long add(AtomicLong requested, long n) {
    for (;;) {
        long r = requested.get();
        if (r == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long u = addCap(r, n);
        // requested的这里会被设置成最大值
        if (requested.compareAndSet(r, u)) {
            return r;
        }
    }
}

io.reactivex.internal.util.BackpressureHelper#produced

public static long produced(AtomicLong requested, long n) {
    for (;;) {
        long current = requested.get();
        if (current == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long update = current - n;
        if (update < 0L) {
            RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
            update = 0L;
        }
        if (requested.compareAndSet(current, update)) {
            return update;
        }
    }
}

源码阅读总结

不调用request,onNext不被执行的原因分析

Subscriber->onSubscribe

s.request(Integer.MAX_VALUE);

FlowableCreate.BaseEmitter#request ->

io.reactivex.internal.util.BackpressureHelper#add

将设置的值更新到BaseEmitter中,BaseEmitter继承自AtomicLong


FlowableOnSubscribe-> subscribe
emitter.onNext(a);

io.reactivex.internal.operators.flowable.FlowableCreate.NoOverflowBaseAsyncEmitter#onNext

获取AtomicLong的值,不为0的时候,才会调用Subscriber的onNext方法

总之,Flowable是复用强制拉取,解决背压策略的

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容