You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

84 lines
2.2 KiB

//
// RACMulticastConnection.m
// ReactiveObjC
//
// Created by Josh Abernathy on 4/11/12.
// Copyright (c) 2012 GitHub, Inc. All rights reserved.
//
#import "RACMulticastConnection.h"
#import "RACMulticastConnection+Private.h"
#import "RACDisposable.h"
#import "RACSerialDisposable.h"
#import "RACSubject.h"
#import <libkern/OSAtomic.h>
@interface RACMulticastConnection () {
RACSubject *_signal;
// When connecting, a caller should attempt to atomically swap the value of this
// from `0` to `1`.
//
// If the swap is successful the caller is resposible for subscribing `_signal`
// to `sourceSignal` and storing the returned disposable in `serialDisposable`.
//
// If the swap is unsuccessful it means that `_sourceSignal` has already been
// connected and the caller has no action to take.
int32_t volatile _hasConnected;
}
@property (nonatomic, readonly, strong) RACSignal *sourceSignal;
@property (strong) RACSerialDisposable *serialDisposable;
@end
@implementation RACMulticastConnection
#pragma mark Lifecycle
- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;
return self;
}
#pragma mark Connecting
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}
- (RACSignal *)autoconnect {
__block volatile int32_t subscriberCount = 0;
return [[RACSignal
createSignal:^(id<RACSubscriber> subscriber) {
OSAtomicIncrement32Barrier(&subscriberCount);
RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
RACDisposable *connectionDisposable = [self connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
[connectionDisposable dispose];
}
}];
}]
setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}
@end