在介绍RACMulticastConnection之前,先说明一个Reactive Cocoa中其他几个用到的类。
RACReplaySubject
RACSubject
RACSubject继承RACSignal,同时也实现了RACSubscriber协议,可以看成是一个可以手动控制发送next、completed和error的信号。
RACReplaySubject继承了RACSubject,它会保存已发送的值的(固定上限数量),每次有新的订阅者,它都会将这些值重新发送给这些新的订阅者,对于error和completed也是同样的。RACReplaySubject重新实现了subscriber协议,使得这个类有了一些不同的行为。
它有一个valuesReceived属性来存储接收到的值,每次有新的订阅者时,便将valuesReceived数组中的所有值发送给订阅者。
RACMulticastConnection
Multicast意为“多播”,multicast connection的意图是将一次对信号的订阅分享给多个订阅者。常见的使用情形就是:涉及的信号包含了side effect或者不希望它被多次调用。
RACMulticastConnection对外暴露的接口和属性只有三个:
- signal 表示这个多播信号
-
- (RACDisposable *)connect
,通过订阅signal来与它进行连接。只有调用了connect方法,信号才被订阅,在此之前signal不回接收任何值 -
- (RACSignal *)autoconnect
,当返回的signal被订阅时,立刻connect多播信号,当返回的信号没有订阅者时,自动清除多播信号的订阅。
注意,不要主动创建RACMulticastConnection。使用RACSignal的publish或multicast方法来创建。
RACMulticastConnection的初始化方法是私有的,其实现方式如下:
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
if (self == nil) return nil;
_sourceSignal = source;//原信号
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;//对外暴露的多播信号
return self;
}
关于connect的源代码其实也很简单:
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];//关键
}
return self.serialDisposable;
}
其实它主要做的就是让对外暴露的多播信号订阅原信号。RACMulticastConnection只是提供了实现多播的原型功能,我们真正使用到的还是RACSignal定义的相关的多播操作。
- (RACMulticastConnection *)publish {
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}
publish和multicast都会创建返回一个多播连接,将原来信号转换成一个RACSubject信号.本质上,RACMulticastConnection只是为普通RACSignal与RACSubject、RACReplaySubject架起一座桥梁。
那么,多播的特性:多个订阅者共享一个订阅行为是如何实现的呢?在RACDynamicSignal的subscriber协议中,对subscribe的实现有这么一段:
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
可以看出,每次订阅发生时,都会执行一次订阅的block。而RACSubject的subscribe协议方法的实现如下:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}
它只是将subscriber添加到了队列当中,并没有再执行订阅的block。所以本质,仍是有多个订阅者,但是订阅的副作用只发生一次,所以表现出共享订阅者的行为。