Как правильно контролировать выбросы для определенного планировщика в созданном Observable

Я использую RxJava2 и, допустим, у меня есть этот Observable:

Observable
   .create(emitter -> 
      SomeDependency.registerCallback(data -> emitter.onNext(data))
   )
   .subscribeOn(Schedulers.io());

Он наблюдает за некоторой асинхронной логикой, а затем испускает все, что от нее получает. Важно знать, что в зарегистрированном обратном вызове данные доставляются в потоке, обрабатываемом SomeDependency. Как следствие, это приводит к тому, что все выбросы emitter в нисходящий поток доставляются в этом потоке, игнорируя определенный Scheduler.

В https://stackoverflow.com/a/43283760/1618316 есть подсказка от @akarnokd, которая указывает способ перенаправить данные в правильный поток, используя Scheduler.Worker. Модифицированный пример для такого подхода будет выглядеть следующим образом:

Observable
   .create(emitter -> {
      final Worker worker = Schedulers.trampoline().createWorker();
      emitter.setDisposable(worker);
      SomeDependency.registerCallback(data -> 
         worker.schedule(() -> emitter.onNext(data))
      )
   })
   .subscribeOn(Schedulers.io());

ПРИМЕЧАНИЕ. trampoline() создает Scheduler для текущего потока. В нашем случае поток io(), поскольку мы определили его для создания нашего Observable.

Дело в том, что при создании такого рода Observable обычно требуется не только registerCallback(), но и unregisterCallback(). В обычных сценариях вы помещаете unregisterCallback() в Disposable emitter. Однако, как видите, у нашего emitter уже есть Disposable, и мы не можем установить другой. Если будет установлен второй Disposable, то предыдущий сбрасывается и удаляется.

Есть ли у вас какие-либо идеи о том, как подойти к этой проблеме, пожалуйста?


person bakua    schedule 07.05.2017    source источник


Ответы (1)


Я думаю, что вам нужно CompositeDisposable.

Одноразовый контейнер, в который можно поместить несколько других одноразовых предметов.

person Dean Xu    schedule 08.05.2017
comment
Отличная идея. Спасибо. - person bakua; 08.05.2017