Отвечая на ваш вопрос напрямую: можно создать поток, который «применяет некоторую обработку к каждому элементу, а затем объединяет их обратно в одно значение».
Разработка вашего примера с помощью некоторого образца кода:
case class Item(itemId : String)
case class PurchaseOrder(orderId : String, items : Seq[Item])
val purchaseOrder : PurschaseOrder = ???
Если бы мы хотели обрабатывать элементы с помощью потока, мы могли бы, хотя точная природа сокращения была неоднозначной в вопросе, поэтому я не буду определять, как достигается сворачивание:
type ProcessOutput = ???
def processItem(item : Item) : ProcessOutput = ???
val combinedResult : Future[CombinedResult] =
Source.fromIterator( purchaseOrder.items.toIterator )
.via(Flow[Item] map processItem)
.to(Sink.fold[ProcessOutput](???)(???) )
.run()
Отвечая на ваш вопрос косвенно,
Прежде всего подумайте о фьючерсах
Потоки Akka очень полезны, когда необходимо противодавление. Обратное давление является обычным явлением, когда вы подключаетесь к внешнему источнику данных, потому что bp позволяет вашему приложению определять, насколько быстро данные передаются вам, поскольку вы несете ответственность за непрерывную сигнализацию потребности в дополнительных данных.
В случае, если вы указали в вопросе, нет необходимости транслировать запрос, и нести накладные расходы, которых требует такое общение. У вас уже есть коллекция предметов, поэтому не к кому отправить запрос ...
Вместо этого я думаю, что Futures - лучший вариант для случая, который вы описали:
def futProcess(item : Item)(implicit ec : ExecutionContext) =
Future { processItem(item) }
// same output type as the stream run
val combinedResults : Future[CombinedResult] =
Future.sequence{ purchaseOrder.items map futProcess }
.map{ _ fold[ProcessOutput](???)(???) }
Вы получите лучшую производительность, меньшую сложность, имея полную систему ActorSystem, и в любом случае такой же результат, как и поток ...
person
Ramón J Romero y Vigil
schedule
13.11.2016