Группа Akka-stream только Правые элементы Либо

У меня есть источник, который излучает Either[String, MyClass].

Я хочу вызвать внешнюю службу с пакетами MyClass и продолжить работу с Either[String, ExternalServiceResponse], поэтому мне нужно сгруппировать элементы потока.

Если бы поток испускал только MyClass элементов, это было бы просто — достаточно вызвать grouped:

val source: Source[MyClass, NotUsed] = <custom implementation>
source
  .grouped(10)                 // Seq[MyClass]
  .map(callExternalService(_)) // ExternalServiceResponse

Но как в моем сценарии сгруппировать только элементы справа от Либо?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
  .???                                                      // Either[String, Seq[MyClass]]
  .map {
    case Right(myClasses) => callExternalService(myClasses)
    case Left(string) => Left(string)
  }                                                         // Either[String, ExternalServiceResponse]

Следующее работает, но есть ли более идиоматический способ?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
  .groupBy(2, either => either.isRight)
  .grouped(10)
  .map(input => input.headOption match {
    case Some(Right(_)) =>
      callExternalService(input.map(item => item.right.get))
    case _ =>
      input
  })
  .mapConcat(_.to[scala.collection.immutable.Iterable])
  .mergeSubstreams

person mirelon    schedule 04.03.2020    source источник


Ответы (2)


Это должно преобразовать источник Either[L, R] в источник Either[L, Seq[R]] с настраиваемой группировкой Right.

def groupRights[L, R](groupSize: Int)(in: Source[Either[L, R], NotUsed]): Source[Either[L, Seq[R]], NotUsed] =
  in.map(Option _)  // Yep, an Option[Either[L, R]]
    .concat(Source.single(None)) // to emit when `in` completes
    .statefulMapConcat { () =>
      val buffer = new scala.collection.mutable.ArrayBuffer[R](groupSize)

      def dumpBuffer(): List[Either[L, Seq[R]] = {
        val out = List(Right(buffer.toList))
        buffer.clear()
        out
      }

      incoming: Option[Either[L,R]] => {
        incoming.map { _.fold(
            l => List(Left(l)),  // unfortunate that we have to re-wrap
            r => {
              buffer += r
              if (buffer.size == groupSize) {
                dumpBuffer()
              } else {
                Nil
              }
            }
          )
        }.getOrElse(dumpBuffer()) // End of stream
      }
    }

Кроме того, я отмечу, что нижестоящий код для вызова внешней службы может быть переписан как

.map(_.right.map(callExternalService))

Если вы можете надежно вызывать внешнюю службу с помощью параллелизма n, возможно, стоит сделать это и с помощью:

.mapAsync(n) { e.fold(
    l => Future.successful(Left(l)),
    r => Future { Right(callExternalService(r)) }
  )
}

Вы могли бы даже, если хотите максимизировать пропускную способность за счет сохранения порядка, заменить mapAsync на mapAsyncUnordered.

person Levi Ramsey    schedule 04.03.2020

Вы можете разделить свой источник на две ветви, чтобы обрабатывать права по-своему, а затем объединить два подпотока:

// case class MyClass(x: Int)
// case class ExternalServiceResponse(xs: Seq[MyClass])
// def callExternalService(xs: Seq[MyClass]): ExternalServiceResponse =
//    ExternalServiceResponse(xs)
// val source: Source[Either[String, MyClass], _] =
//   Source(List(Right(MyClass(1)), Left("2"), Right(MyClass(3)), Left("4"), Right(MyClass(5))))

val lefts: Source[Either[String, Nothing], _] =
  source
    .collect { case Left(l) => Left(l) }

val rights: Source[Either[Nothing, ExternalServiceResponse], _] =
  source
    .collect { case Right(x: MyClass) => x }
    .grouped(2)
    .map(callExternalService)
    .map(Right(_))

val out: Source[Either[String, ExternalServiceResponse], _] = rights.merge(lefts)

// out.runForeach(println)
// Left(2)
// Right(ExternalServiceResponse(Vector(MyClass(1), MyClass(3))))
// Left(4)
// Right(ExternalServiceResponse(Vector(MyClass(5))))
person Xavier Guihot    schedule 31.05.2020