Тема RxJava как труба

Я хочу использовать Subject, чтобы создать по существу конвейер между одним Observable и другим без использования карта или flatMap, так как их использование слишком многословно.

Я не могу понять, как это сделать с помощью Subject, хотя кажется, что это был бы правильный подход, учитывая, что Subject (согласно документам):

действует как подписчик и как наблюдаемый

Подпись Subject<T,R>, где Observable<R> и Observer<T>. Это означает, что я должен быть в состоянии подключить T к R.

Вот как это в основном выглядит в коде:

class MySubject extends Subject<T, R> {

  protected MySubject(OnSubscribe<R> onSubscribe) {
    super(onSubscribe);
  }

  @Override public void onNext(T in) {
    // Data comes in
  }

  @Override public void onCompleted() {
    // do something
  }

  @Override public void onError(Throwable throwable) {
    // do something with the error
  }

}

person Christopher Perry    schedule 22.03.2014    source источник
comment
По сравнению с реализацией нового Subject проще реализовать функцию, используемую в map или flatMap, не так ли?   -  person zsxwing    schedule 24.03.2014
comment
@zsxwing Похоже на то. Я узнал это на собственном горьком опыте.   -  person Christopher Perry    schedule 24.03.2014


Ответы (1)


Я имею дело с этим в Monifu, передавая наблюдателям метод flatMap (игнорируя тот факт, что PublishSubject — конечный класс). Это должно быть похоже на RxJava или RxScala:

class PipeSubject extends PublishSubject[RxEvent] {
  override def onNext(elem: RxEvent): Future[Ack] = {
    if (!isCompleted) {
      val observers = subscriptions
      if (observers.nonEmpty)
        pipeThroughMany(observers, elem)
      else
        Continue
    }
    else
      Cancel
  }

 private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
    val length = array.length
    def >>>(idx: Int = 0): Future[Continue] = {
      val obs = array(idx)
      obs.onNext(elem).flatMap {
         case Continue =>
           if (idx+1 < length)
              >>>(idx+1)
           else
             Continue
         case _ =>
           removeSubscription(obs)
           Continue
      }
    }
    >>>()
  }
}
person lisak    schedule 04.12.2014