В настоящее время я реализую небольшое подмножество API RxJava2 для личного проекта. У меня есть API на основе слушателя, и я начал писать код, чтобы обернуть мой слушатель и реализовать Flowable.create()
:
public class EventBus {
private final Flowable<Event> events;
public EventBus(MyApi api) {
this.events = Flowable.create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event event) {
emitter.onNext(event);
}
};
api.addListener(listener);
// TODO api.removeListener(listener)
}, BackpressureStrategy.BUFFER);
}
}.
Я провел быстрый тест, и он работал нормально, но я понял, что он однопоточный. Ничего страшного: на самом деле RxJava спроектирован так, чтобы быть однопоточным, если не указан Scheduler
.
Согласно документации RxJava2, я решил связать вызов Flowable.subscribeOn()
, который я буду вызывать с аргументом Scheduler.computation()
.
Итак, я перешел к реализации Flowable.subscribeOn()
и Scheduler.computation()
, и именно здесь я пытаюсь понять: я видел в разных местах, что Java ForkJoinPool.commonPool()
рекомендуется для выполнения вычислительных задач, но RxJava2 не использует его. Мои основные вопросы будут такими:
- Подойдет ли это для моей очень простой реактивной реализации?
- Почему RxJava2 решил реализовать свой собственный пул, а не использовать этот?
- Есть ли какие-либо проблемы с этим подходом, о которых мне следует знать, чтобы облегчить себе жизнь в будущем?
Спасибо!