Я использую RxJava2 и, допустим, у меня есть этот Observable:
Observable
.create(emitter ->
SomeDependency.registerCallback(data -> emitter.onNext(data))
)
.subscribeOn(Schedulers.io());
Он наблюдает за некоторой асинхронной логикой, а затем испускает все, что от нее получает. Важно знать, что в зарегистрированном обратном вызове данные доставляются в потоке, обрабатываемом SomeDependency
. Как следствие, это приводит к тому, что все выбросы emitter
в нисходящий поток доставляются в этом потоке, игнорируя определенный Scheduler
.
В https://stackoverflow.com/a/43283760/1618316 есть подсказка от @akarnokd, которая указывает способ перенаправить данные в правильный поток, используя Scheduler.Worker
. Модифицированный пример для такого подхода будет выглядеть следующим образом:
Observable
.create(emitter -> {
final Worker worker = Schedulers.trampoline().createWorker();
emitter.setDisposable(worker);
SomeDependency.registerCallback(data ->
worker.schedule(() -> emitter.onNext(data))
)
})
.subscribeOn(Schedulers.io());
ПРИМЕЧАНИЕ.
trampoline()
создаетScheduler
для текущего потока. В нашем случае потокio()
, поскольку мы определили его для создания нашего Observable.
Дело в том, что при создании такого рода Observable обычно требуется не только registerCallback()
, но и unregisterCallback()
. В обычных сценариях вы помещаете unregisterCallback()
в Disposable
emitter
. Однако, как видите, у нашего emitter
уже есть Disposable
, и мы не можем установить другой. Если будет установлен второй Disposable
, то предыдущий сбрасывается и удаляется.
Есть ли у вас какие-либо идеи о том, как подойти к этой проблеме, пожалуйста?