123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- //
- // RACCommand.m
- // ReactiveCocoa
- //
- // Created by Josh Abernathy on 3/3/12.
- // Copyright (c) 2012 GitHub, Inc. All rights reserved.
- //
- #import "RACCommand.h"
- #import "RACEXTScope.h"
- #import "NSArray+RACSequenceAdditions.h"
- #import "NSObject+RACDeallocating.h"
- #import "NSObject+RACDescription.h"
- #import "NSObject+RACPropertySubscribing.h"
- #import "RACMulticastConnection.h"
- #import "RACReplaySubject.h"
- #import "RACScheduler.h"
- #import "RACSequence.h"
- #import "RACSerialDisposable.h"
- #import "RACSignal+Operations.h"
- #import <libkern/OSAtomic.h>
- NSString * const RACCommandErrorDomain = @"RACCommandErrorDomain";
- NSString * const RACUnderlyingCommandErrorKey = @"RACUnderlyingCommandErrorKey";
- const NSInteger RACCommandErrorNotEnabled = 1;
- @interface RACCommand () {
- // The mutable array backing `activeExecutionSignals`.
- //
- // This should only be used while synchronized on `self`.
- NSMutableArray *_activeExecutionSignals;
- // Atomic backing variable for `allowsConcurrentExecution`.
- volatile uint32_t _allowsConcurrentExecution;
- }
- // An array of signals representing in-flight executions, in the order they
- // began.
- //
- // This property is KVO-compliant.
- @property (atomic, copy, readonly) NSArray *activeExecutionSignals;
- // `enabled`, but without a hop to the main thread.
- //
- // Values from this signal may arrive on any thread.
- @property (nonatomic, strong, readonly) RACSignal *immediateEnabled;
- // The signal block that the receiver was initialized with.
- @property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input);
- // Improves the performance of KVO on the receiver.
- //
- // See the documentation for <NSKeyValueObserving> for more information.
- @property (atomic) void *observationInfo;
- // Adds a signal to `activeExecutionSignals` and generates a KVO notification.
- - (void)addActiveExecutionSignal:(RACSignal *)signal;
- // Removes a signal from `activeExecutionSignals` and generates a KVO
- // notification.
- - (void)removeActiveExecutionSignal:(RACSignal *)signal;
- @end
- @implementation RACCommand
- #pragma mark Properties
- - (BOOL)allowsConcurrentExecution {
- return _allowsConcurrentExecution != 0;
- }
- - (void)setAllowsConcurrentExecution:(BOOL)allowed {
- [self willChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
- if (allowed) {
- OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
- } else {
- OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
- }
- [self didChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
- }
- - (NSArray *)activeExecutionSignals {
- @synchronized (self) {
- return [_activeExecutionSignals copy];
- }
- }
- - (void)addActiveExecutionSignal:(RACSignal *)signal {
- NSCParameterAssert([signal isKindOfClass:RACSignal.class]);
- @synchronized (self) {
- // The KVO notification has to be generated while synchronized, because
- // it depends on the index remaining consistent.
- NSIndexSet *indexes = [NSIndexSet indexSetWithIndex:_activeExecutionSignals.count];
- [self willChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
- [_activeExecutionSignals addObject:signal];
- [self didChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
- }
- }
- - (void)removeActiveExecutionSignal:(RACSignal *)signal {
- NSCParameterAssert([signal isKindOfClass:RACSignal.class]);
- @synchronized (self) {
- // The indexes have to be calculated and the notification generated
- // while synchronized, because they depend on the indexes remaining
- // consistent.
- NSIndexSet *indexes = [_activeExecutionSignals indexesOfObjectsPassingTest:^ BOOL (RACSignal *obj, NSUInteger index, BOOL *stop) {
- return obj == signal;
- }];
- if (indexes.count == 0) return;
- [self willChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
- [_activeExecutionSignals removeObjectsAtIndexes:indexes];
- [self didChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
- }
- }
- #pragma mark Lifecycle
- - (id)init {
- NSCAssert(NO, @"Use -initWithSignalBlock: instead");
- return nil;
- }
- - (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
- return [self initWithEnabled:nil signalBlock:signalBlock];
- }
- - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
- NSCParameterAssert(signalBlock != nil);
- self = [super init];
- if (self == nil) return nil;
- _activeExecutionSignals = [[NSMutableArray alloc] init];
- _signalBlock = [signalBlock copy];
- // A signal of additions to `activeExecutionSignals`.
- RACSignal *newActiveExecutionSignals = [[[[[self
- rac_valuesAndChangesForKeyPath:@keypath(self.activeExecutionSignals) options:NSKeyValueObservingOptionNew observer:nil]
- reduceEach:^(id _, NSDictionary *change) {
- NSArray *signals = change[NSKeyValueChangeNewKey];
- if (signals == nil) return [RACSignal empty];
- return [signals.rac_sequence signalWithScheduler:RACScheduler.immediateScheduler];
- }]
- concat]
- publish]
- autoconnect];
- _executionSignals = [[[newActiveExecutionSignals
- map:^(RACSignal *signal) {
- return [signal catchTo:[RACSignal empty]];
- }]
- deliverOn:RACScheduler.mainThreadScheduler]
- setNameWithFormat:@"%@ -executionSignals", self];
-
- // `errors` needs to be multicasted so that it picks up all
- // `activeExecutionSignals` that are added.
- //
- // In other words, if someone subscribes to `errors` _after_ an execution
- // has started, it should still receive any error from that execution.
- RACMulticastConnection *errorsConnection = [[[newActiveExecutionSignals
- flattenMap:^(RACSignal *signal) {
- return [[signal
- ignoreValues]
- catch:^(NSError *error) {
- return [RACSignal return:error];
- }];
- }]
- deliverOn:RACScheduler.mainThreadScheduler]
- publish];
-
- _errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
- [errorsConnection connect];
- RACSignal *immediateExecuting = [RACObserve(self, activeExecutionSignals) map:^(NSArray *activeSignals) {
- return @(activeSignals.count > 0);
- }];
- _executing = [[[[[immediateExecuting
- deliverOn:RACScheduler.mainThreadScheduler]
- // This is useful before the first value arrives on the main thread.
- startWith:@NO]
- distinctUntilChanged]
- replayLast]
- setNameWithFormat:@"%@ -executing", self];
- RACSignal *moreExecutionsAllowed = [RACSignal
- if:RACObserve(self, allowsConcurrentExecution)
- then:[RACSignal return:@YES]
- else:[immediateExecuting not]];
-
- if (enabledSignal == nil) {
- enabledSignal = [RACSignal return:@YES];
- } else {
- enabledSignal = [[[enabledSignal
- startWith:@YES]
- takeUntil:self.rac_willDeallocSignal]
- replayLast];
- }
-
- _immediateEnabled = [[RACSignal
- combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
- and];
-
- _enabled = [[[[[self.immediateEnabled
- take:1]
- concat:[[self.immediateEnabled skip:1] deliverOn:RACScheduler.mainThreadScheduler]]
- distinctUntilChanged]
- replayLast]
- setNameWithFormat:@"%@ -enabled", self];
- return self;
- }
- #pragma mark Execution
- - (RACSignal *)execute:(id)input {
- // `immediateEnabled` is guaranteed to send a value upon subscription, so
- // -first is acceptable here.
- BOOL enabled = [[self.immediateEnabled first] boolValue];
- if (!enabled) {
- NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
- NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
- RACUnderlyingCommandErrorKey: self
- }];
- return [RACSignal error:error];
- }
- RACSignal *signal = self.signalBlock(input);
- NSCAssert(signal != nil, @"nil signal returned from signal block for value: %@", input);
- // We subscribe to the signal on the main thread so that it occurs _after_
- // -addActiveExecutionSignal: completes below.
- //
- // This means that `executing` and `enabled` will send updated values before
- // the signal actually starts performing work.
- RACMulticastConnection *connection = [[signal
- subscribeOn:RACScheduler.mainThreadScheduler]
- multicast:[RACReplaySubject subject]];
-
- @weakify(self);
- [self addActiveExecutionSignal:connection.signal];
- [connection.signal subscribeError:^(NSError *error) {
- @strongify(self);
- [self removeActiveExecutionSignal:connection.signal];
- } completed:^{
- @strongify(self);
- [self removeActiveExecutionSignal:connection.signal];
- }];
- [connection connect];
- return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, [input rac_description]];
- }
- #pragma mark NSKeyValueObserving
- + (BOOL)automaticallyNotifiesObserversForKey:(NSString *)key {
- // Generate all KVO notifications manually to avoid the performance impact
- // of unnecessary swizzling.
- return NO;
- }
- @end
|