Потоки Akka и границы транзакций

Я все еще понимаю концепции потоков Akka и пытаюсь понять, как сопоставить их со сценариями, когда у нас есть набор элементов, которые необходимо обрабатывать атомарно. Допустим, у нас есть заказ на покупку, состоящий из нескольких элементов, и нам нужно применить некоторую обработку к каждому элементу, а затем объединить их обратно в одно значение. Должен ли такой рабочий процесс стать отдельным отдельным потоком (или подпотоком), который закрывается после полной обработки заказа на поставку? Т.е. каждый заказ на покупку запускает новый поток? Или у меня бесконечный поток заказов на покупку? Но если да, не возникнет ли у меня проблема смешивания заказов на покупку из разных заказов?

Другими словами, то, что я пытаюсь достичь, - это изоляция обработки различных рабочих процессов и вопрос, подходят ли потоки Akka для этого.


person Vagif Abilov    schedule 12.11.2016    source источник


Ответы (1)


Отвечая на ваш вопрос напрямую: можно создать поток, который «применяет некоторую обработку к каждому элементу, а затем объединяет их обратно в одно значение».

Разработка вашего примера с помощью некоторого образца кода:

case class Item(itemId : String)

case class PurchaseOrder(orderId : String, items : Seq[Item])

val purchaseOrder : PurschaseOrder = ???

Если бы мы хотели обрабатывать элементы с помощью потока, мы могли бы, хотя точная природа сокращения была неоднозначной в вопросе, поэтому я не буду определять, как достигается сворачивание:

type ProcessOutput = ???

def processItem(item : Item) : ProcessOutput = ???

val combinedResult : Future[CombinedResult] = 
  Source.fromIterator( purchaseOrder.items.toIterator )
        .via(Flow[Item] map processItem)
        .to(Sink.fold[ProcessOutput](???)(???) )
        .run()

Отвечая на ваш вопрос косвенно,

Прежде всего подумайте о фьючерсах

Потоки Akka очень полезны, когда необходимо противодавление. Обратное давление является обычным явлением, когда вы подключаетесь к внешнему источнику данных, потому что bp позволяет вашему приложению определять, насколько быстро данные передаются вам, поскольку вы несете ответственность за непрерывную сигнализацию потребности в дополнительных данных.

В случае, если вы указали в вопросе, нет необходимости транслировать запрос, и нести накладные расходы, которых требует такое общение. У вас уже есть коллекция предметов, поэтому не к кому отправить запрос ...

Вместо этого я думаю, что Futures - лучший вариант для случая, который вы описали:

def futProcess(item : Item)(implicit ec : ExecutionContext) = 
  Future { processItem(item) } 

// same output type as the stream run 
val combinedResults : Future[CombinedResult] = 
  Future.sequence{ purchaseOrder.items map futProcess }
        .map{ _ fold[ProcessOutput](???)(???) }

Вы получите лучшую производительность, меньшую сложность, имея полную систему ActorSystem, и в любом случае такой же результат, как и поток ...

person Ramón J Romero y Vigil    schedule 13.11.2016
comment
Спасибо за подробный ответ. Я также понимаю, что в некоторых сценариях потоки могут быть неоптимальными, как вы указали. Но в целом я вижу большие возможности у стримов и хочу их проверить. - person Vagif Abilov; 13.11.2016