ReactiveCocoa takeUntil и takeWhile не отправляет последний следующий

Рассмотрим следующий фрагмент:

- (RACSignal *)startRouting {
...
}

- (RACSignal *)updateRoutingWithSession:(NSString *)session {
...
}

- (RACSignal *)fetchFlights {
    return [[self startRouting] flattenMap:^RACStream *(NSString *session) {
        return [[[[self updateRoutingWithSession:session]
                        delay:2.0f]
                        repeat]
                        takeUntilBlock:^BOOL(RACTuple *operationAndResponse) {
                            AFHTTPRequestOperation *operation = [operationAndResponse first];
                            NSDictionary *response = [operationAndResponse second];
                            return [operation isCancelled] || 100 == [response[kPercentComplete] intValue];
                        }];
    }];
}

Здесь происходит то, что startRouting возвращает RACSignal, который отправляет идентификатор сеанса. updateRoutingWithSession: возвращает RACSignal, который отправляет поиск NSDictionary, включая атрибут PercentComplete. Между опросами двухсекундная задержка.

fetchFlights будет работать до тех пор, пока updateRoutingWithSession: не наберет PercentComplete 100.

Моя проблема в том, что самый последний sendNext:, где takeUntilBlock возвращает true, не достигает RACSubscriber.

Что мне не хватает?


person Hans Sjunnesson    schedule 03.04.2013    source источник


Ответы (2)


Я нашел это в мире RX. Это обычно решается путем слияния двух сигналов. Тот, который принимает повторяющийся источник до тех пор, пока предикат не станет истинным. И другой, который пропускает, пока предикат истинен.

Это выглядит примерно так

 BOOL (^finished)(id _) = ^BOOL(id _) {
    return predicate; // BOOLean here
}

// You want a multicast signal, because multiple signals will subscribe to the source.
// Multicasting it means that you won't get repeated api-requests, in this case.
RACMulticastConnection *source = [[theSignal repeat] publish];

RACSignal *whileNotDone = [source.signal takeUntilBlock:finished];
RACSignal *whenDone = [[source.signal skipUntilBlock:finished] take:1];
RACSignal *merged = [RACSignal merge:@[whileNotDone, whenDone]];

[source connect]; // Needed for a multicast signal to initiate.

Сигнал merged будет sendNext каждые next в source, включая самый последний. Затем sendCompleted.

Некоторые ссылки из мира RX:

person Hans Sjunnesson    schedule 17.04.2013

Чтобы уточнить: ваша проблема в том, что next, который запускает завершение, не отправляется? takeUntilBlock будет распространять следующее до тех пор, пока предикат не станет НЕТ. (документация). Следовательно, последний next будет не быть отправленным. Но вы можете подписаться на completion, что должно произойти в этом случае.

person allprog    schedule 03.04.2013
comment
Абсолютно правильно. Что я мог бы сделать, так это не использовать takeUntilBlock и вместо этого переместить предикат в updateRoutingWithSession: и только sendComplete: когда PercentComplete равно 100. Но это нарушит логический поток updateRoutingWithSession: который не обращает внимания на то, как его используют функции более высокого порядка, т.е. не знаю, что идея состоит в том, чтобы опросить службу до ее завершения. - person Hans Sjunnesson; 03.04.2013
comment
@HansSjunnesson Я думаю, что @allprog пытается указать, что вы можете использовать здесь событие completed, чтобы узнать, когда takeUntilBlock: возвращает NO. Вы можете сделать это через -subscribeCompleted: или через оператор -последовательностьСледующая:. - person Justin Spahr-Summers; 07.04.2013
comment
@JustinSpahr-Summers Верно, но завершено, данные не отправляются. Меня интересует содержимое последнего завершающего значения, которое отправляет сигнал. TakeWhile подавляет распространение этого последнего next. - person Hans Sjunnesson; 09.04.2013
comment
Ах, извините, я не понял эту часть. В этом случае вы можете комбинировать некоторые другие операторы, такие как -mapPreviousWithStart:combine: и -materialize, чтобы сохранить предыдущее значение, а затем рассматривать завершение как значение само по себе. - person Justin Spahr-Summers; 11.04.2013