Объединение двух наблюдаемых последовательностей, у каждой из которых есть subscribeOn. Как обеспечить выполнение наблюдаемого в потоке?

Когда дело доходит до обеспечения выполнения определенного фрагмента кода Observable.create в определенном потоке (т.е. фоновом потоке), я беспокоюсь, что использование оператора subscribeOn может не сработать, потому что бывают случаи, когда я могу связать эту наблюдаемую последовательность с другой наблюдаемой последовательностью, которая выполняется в основном потоке (с использованием observeOn).

Пример

Ситуация такова, что у меня есть наблюдаемая последовательность, работающая в основном потоке (то есть окно предупреждения, запрашивающее у пользователя ввод, о том, выполнять ли сетевой вызов или нет).

Было бы лучше убедиться, что этот Observable.create код выполняется в фоновом потоке, используя что-то вроде:

Observable<String>.empty()
   .observeOn(ConcurrentMainScheduler(queue: background.queue))
   .concat(myObservableNetworkCall)

Почему бы просто не использовать subscribeOn?

Проблема в том, что если бы я использовал subscribeOn (второй), а предыдущий наблюдаемый (контроллер предупреждений) был настроен на запуск в фоновом потоке с использованием subscribeOn (первый), то второй оператор subscribeOn не работал бы, поскольку первый вызов ближе к наблюдаемый источник:

Если вы укажете несколько операторов subscribeOn (), будет использоваться тот, который приближается к источнику (крайний левый).

Это может быть поведение RxJava, но я не уверен в Swift. Reactivex.io просто говорит, что мы не должны вызывать subscribeOn несколько раз.

Я обычно оборачиваю операции в Observable<Void>, и их нужно запускать в разных потоках ... Вот почему я спрашиваю, как обеспечить выполнение наблюдаемого кода в потоке, для которого я его указал. subscribeOn не сработает, потому что я могу объединить наблюдаемое.

Я хочу, чтобы поток, в котором он должен выполняться, был инкапсулирован в мое определение Observable, а не выше в цепочке.

Лучше всего сделать следующее:

  1. Начните с Observable.empty, используя тип данных, который я хочу использовать.
  2. Используйте observeOn, чтобы принудительно запустить поток, в котором я хочу, чтобы он выполнялся.
  3. Свяжите его с фактическим Observable, который я хочу использовать.

Редактировать

  1. Я прочитал subscribeOn и observeOn документацию на reactivex.io.

  2. Я знаком с тем, как переключаться между потоками с помощью subscribeOn и observeOn.

  3. Что меня особенно беспокоит, так это сложность использования subscribeOn при конкатенации или объединении наблюдаемых последовательностей.

  4. Проблема в том, что наблюдаемые объекты должны запускаться специально в одном потоке, И они не знают, где и с кем они будут связаны. Поскольку я точно знаю, в каком потоке они должны работать, я бы предпочел инкапсулировать определение планировщика в определении наблюдаемого, а не при связывании последовательности.


person dsapalo    schedule 07.02.2017    source источник
comment
Вы смотрели драйверы? они выполняются в одном потоке. github.com/ReactiveX/RxSwift/blob/master/Documentation/Units. мкр   -  person Ocunidee    schedule 07.02.2017
comment
Вы могли бы использовать этот способ (представьте, что у вас есть ошибка переменной в вашей модели просмотра); viewModel .error .filter {$ 0! = nil} .asDriver () .drive (onNext: {[unowned self] ошибка в self.handleError (error!)}) .addDisposableTo (disposeBag)   -  person Ocunidee    schedule 07.02.2017
comment
Да, я уже читал о драйверах, но это решает другую проблему. Если это проясняет, представьте для моего первоначального примера два отдельных фоновых потока (не основной поток). Меня беспокоит природа оператора subscribeOn и возможные рекомендуемые передовые методы, обеспечивающие выполнение конкатенированного Observable<Void>.create() в определенном потоке.   -  person dsapalo    schedule 07.02.2017


Ответы (1)


В объявлении функции лучше не указывать, в каком потоке функция должна быть вызвана. Например:

func myObservableNetworkCall() -> Observable<String> {
    return Observable<String>.create { observer in
        // your network code here

        return Disposables.create {
            // Your dispose
        }
    }
}

func otherObservableNetworkCall(s: String) -> Observable<String> {
    return Observable<String>.create { observer in
        // your network code here

        return Disposables.create {
            // Your dispose
        }
    }
}

А затем переключитесь между Планировщиками:

myObservableNetworkCall()
    .observeOn(ConcurrentMainScheduler(queue: background.queue)) // .background thread, network request, mapping, etc...
    .flatMap { string in
        otherObservableNetworkCall(s: string)
    }
    .observeOn(MainScheduler.instance) // switch to MainScheduler, UI updates
    .subscribe(onNext:{ string in
        // do something 
    })
person xandrefreire    schedule 07.02.2017
comment
Спасибо, но это не моя проблема. Я знаком с использованием subscribeOn и observeOn. Не могли бы вы перечитать вопрос, чтобы понять сложность subscribeOn при объединении последовательностей. - person dsapalo; 07.02.2017
comment
Я имел в виду, как показано в моем вопросе, вот как я это делаю прямо сейчас. Пустой наблюдаемый объект, следующий за типом данных, который я использую, и объединяющий наблюдаемый объект, который я использую. Это ваша рекомендация как лучшая практика для обеспечения того, чтобы что-то работало в фоновом потоке? Я не уверен, что лучше всего. - person dsapalo; 07.02.2017
comment
из этот вопрос subscribeOn() сообщает всей цепочке, какой поток запускать обработка дальше. Вы можете вызвать его только один раз в цепочке. Если вы вызовете его еще раз ниже по течению, это не подействует. observeOn() заставляет все операции, которые происходят ниже, выполняться в указанном планировщике. Вы можете вызывать его несколько раз для каждого потока, чтобы перемещаться между разными потоками. - person xandrefreire; 07.02.2017
comment
И хорошее чтение, объясняющее многопоточность RxSwift - person xandrefreire; 07.02.2017
comment
Мне ясно, как использовать subscribeOn и observeOn. Мне нужно пояснение относительно конкатенации двух наблюдаемых, которые вызвали subscribeOn, чтобы убедиться, что они оба работают в указанном мною потоке. Лучше всего начать с пустого наблюдаемого, указать планировщик, а затем объединить его? - person dsapalo; 07.02.2017
comment
Итак, у вас есть две наблюдаемые, которые вызвали subscribeOn. Затем, возможно, вы объедините эти наблюдаемые, и subscribeOn не работает. Это правильно? - person xandrefreire; 07.02.2017
comment
Позвольте нам продолжить это обсуждение в чате. - person xandrefreire; 07.02.2017