Можно ли вообще использовать процессор Reactive Streams в качестве шины событий?

Я начал изучать реактивные потоки, потому что мне было интересно узнать об этой новой тенденции использования RxJava в качестве замены более традиционных шин событий. Этот пост в блоге представляет собой типичное описание того, как готово. Если я правильно понимаю, RxJava 1.x не была строго реализацией Reactive Streams, но была очень похожа. Версия 2.0 включает в себя некоторые классы, которые совместимы или, по крайней мере, проходят TCK, поэтому обновленная версия этого кода может выглядеть немного иначе.

public class UserLocationModel {

  private PublishSubject<LatLng> subject = PublishSubject.create();

  public void setLocation(LatLng latLng) {
    subject.onNext(latLng);
  }

  public Observable<LatLng> getUserLocation() {
    return subject;
  }
}

В терминологии Reactive Streams я думаю, что subject — это Processor, что является и Publisher, и Subscriber.

Проблема в том, что вызов onNext на Subscriber, который ни на что не подписан, может нарушить спецификацию Reactive Streams, в частности правило 1.9.

Это просто деталь реализации, что это вообще работает? Правильно ли я думаю, что вы не можете в целом полагаться на эту работу с совместимой реализацией Reactive Streams?


person Kevin Krumwiede    schedule 03.05.2017    source источник


Ответы (1)


Subjects и Processors стандартного RxJava 2 смягчены, поэтому вам не нужно вызывать onSubscribe для них перед вызовом других методов. Это отчасти связано с традициями, поскольку у субъектов 1.x не было onSubscribe, а отчасти из-за того, что процессоры RxJava 2 не координируют запросы между сторонами Subscriber и Publisher по своему выбору и, следовательно, не используют Subscription. .

Если вы подпишетесь на RxJava Processor на любой RS-совместимый Publisher, они будут запрашивать Long.MAX_VALUE и передавать сигналы как можно чаще. Если вы подпишете Subscriber, совместимый с RS, на RxJava Processor, они будут учитывать обратное давление этих Subscriber и никогда не переполнят их, однако отсутствие запросов может привести к тому, что отдельные MissingBackpressureException будут испущены, а Subscriber "выброшены". В библиотеке расширений есть пользовательский Publisher, который координирует запросы.

Правильно ли я думаю, что вы не можете вообще полагаться на эту работу с совместимой реализацией Reactive Streams.

В спецификации нет ничего, и, следовательно, в TCK не проверено, что должно произойти с Processor, который не получил вызов onSubscribe, но нуждался в нем, поэтому я думаю, что это стало деталью реализации.

Здесь есть две большие проблемы:

  1. Субъекты были изобретены, чтобы соединить императивный мир с реактивным миром и прекрасно работать в случаях с графическим интерфейсом и в случаях без обратного давления в качестве многоадресных трансляторов событий. В реактивно-реактивных многоадресных рассылках они лучше и более прямые альтернативы, такие как publish(Function).
  2. Мышление в шине событий — это шаг назад, потому что вы создаете единую узкую точку, перелопачивая и сливая события на одной «рельсе». Напротив, дизайн для реактивных систем предпочитает отдельные и часто независимые потоки, где каждый поток может переключаться между потоками по мере необходимости и, возможно, избегать основного потока до самого последнего момента.
person akarnokd    schedule 04.05.2017