Использование ForkJoinPool.commonPool() вместо планировщика вычислений RxJava

В настоящее время я реализую небольшое подмножество API RxJava2 для личного проекта. У меня есть API на основе слушателя, и я начал писать код, чтобы обернуть мой слушатель и реализовать Flowable.create():

public class EventBus {
    private final Flowable<Event> events;

    public EventBus(MyApi api) {
        this.events = Flowable.create(emitter -> {
            Callback listener = new Callback() {
                @Override
                public void onEvent(Event event) {
                    emitter.onNext(event);
                }
            };
            api.addListener(listener);
            // TODO api.removeListener(listener)
        }, BackpressureStrategy.BUFFER);
    }
}.

Я провел быстрый тест, и он работал нормально, но я понял, что он однопоточный. Ничего страшного: на самом деле RxJava спроектирован так, чтобы быть однопоточным, если не указан Scheduler.

Согласно документации RxJava2, я решил связать вызов Flowable.subscribeOn(), который я буду вызывать с аргументом Scheduler.computation().

Итак, я перешел к реализации Flowable.subscribeOn() и Scheduler.computation(), и именно здесь я пытаюсь понять: я видел в разных местах, что Java ForkJoinPool.commonPool() рекомендуется для выполнения вычислительных задач, но RxJava2 не использует его. Мои основные вопросы будут такими:

  • Подойдет ли это для моей очень простой реактивной реализации?
  • Почему RxJava2 решил реализовать свой собственный пул, а не использовать этот?
  • Есть ли какие-либо проблемы с этим подходом, о которых мне следует знать, чтобы облегчить себе жизнь в будущем?

Спасибо!


person Martín Coll    schedule 29.03.2018    source источник


Ответы (1)


Подойдет ли это для моей очень простой реактивной реализации?

Написание адаптеров для традиционных API-интерфейсов обратного вызова довольно распространено, если предположить, что они еще не написаны.

Почему RxJava2 решил реализовать свой собственный пул, а не использовать этот?

RxJava всегда ориентировался на Java 6 и выше, а ForkJoinPool — на Java 7. Обратите внимание, что commonPool иногда выполняет работу в потоке вызывающего объекта и не работает асинхронно. У нас было много юнит-тестов, зависших на сервере CI из-за взаимоблокировки одного и того же пула.

реализация небольшого подмножества API RxJava2 Есть ли какие-либо проблемы с этим подходом, о которых я должен знать, чтобы облегчить себе жизнь в будущем?

Что ты имеешь в виду? Вы пытаетесь повторно реализовать RxJava 2, библиотеку, или вы используете и применяете существующие операторы в своем коде?

Если последнее и помимо предостережения с commonPool, это зависит от того, с чем вы пытаетесь взаимодействовать.

person akarnokd    schedule 30.03.2018
comment
Это отличная отправная точка, спасибо! Я повторно реализую операторы, которые мне нужны (пока только несколько), но следую API и семантике RxJava2 на случай, если я захочу переключиться на использование библиотеки в будущем. Это также облегчает мою жизнь, когда я пытаюсь понять, как все работает, и ищу примеры в Интернете :). Ваше замечание о commonPool выполнении работы над текущим потоком — это как раз одна из тех проблем, о которых я не знал, большое спасибо! Я возьму это на заметку. - person Martín Coll; 30.03.2018
comment
Звучит здорово! Так я учился реактивному программированию 7-8 лет назад, с той разницей, что для Rx.NET не было исходников, только несколько видео. Мне пришлось поэкспериментировать, написав и запустив последовательность на C#, а затем попытавшись воспроизвести те же эффекты на Java. - person akarnokd; 30.03.2018
comment
Спасибо за то, что вы здесь для таких людей, как я, переживающих то же, что и вы. И спасибо за написание супер-чистого кода для обучения в RxJava :). - person Martín Coll; 30.03.2018