У меня есть источник, который излучает Either[String, MyClass]
.
Я хочу вызвать внешнюю службу с пакетами MyClass
и продолжить работу с Either[String, ExternalServiceResponse]
, поэтому мне нужно сгруппировать элементы потока.
Если бы поток испускал только MyClass
элементов, это было бы просто — достаточно вызвать grouped
:
val source: Source[MyClass, NotUsed] = <custom implementation>
source
.grouped(10) // Seq[MyClass]
.map(callExternalService(_)) // ExternalServiceResponse
Но как в моем сценарии сгруппировать только элементы справа от Либо?
val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
.??? // Either[String, Seq[MyClass]]
.map {
case Right(myClasses) => callExternalService(myClasses)
case Left(string) => Left(string)
} // Either[String, ExternalServiceResponse]
Следующее работает, но есть ли более идиоматический способ?
val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
.groupBy(2, either => either.isRight)
.grouped(10)
.map(input => input.headOption match {
case Some(Right(_)) =>
callExternalService(input.map(item => item.right.get))
case _ =>
input
})
.mapConcat(_.to[scala.collection.immutable.Iterable])
.mergeSubstreams