Akka stream — список для mapAsync отдельных элементов

В моем потоке есть поток, выходные данные которого являются объектами List[Any]. Я хочу, чтобы за mapAsync следовали некоторые другие этапы, каждый из которых обрабатывал отдельный элемент вместо списка. Как я могу это сделать?

Фактически я хочу подключить вывод

Flow[Any].map { msg =>
  someListDerivedFrom(msg)
}

потребляться -

Flow[Any].mapAsyncUnordered(4) { listElement =>
  actorRef ? listElement
}.someOtherStuff

Как мне это сделать?


person anindyaju99    schedule 18.07.2016    source источник


Ответы (1)


Я думаю, что комбинатор, который вы ищете, это mapConcat. Этот комбинатор примет входной аргумент и вернет что-то, что является Iterable. Простой пример будет следующим:

implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()

val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)

val graph =
  source.
    mapConcat(identity).
    to(sink)
graph.run

Здесь мой Source выдает List элементов, а мой Sink принимает базовый тип того, что содержится в этих List. Я не могу соединить их напрямую вместе, так как типы разные. Но если я применю mapConcat между ними, они могут быть связаны, поскольку этот комбинатор сгладит эти List элементы, отправив их отдельные элементы (Int) вниз по течению. Поскольку элемент ввода для mapConcat уже является Iterable, вам нужно только использовать функцию identify в теле mapConcat, чтобы все заработало.

person cmbaxter    schedule 18.07.2016
comment
Спасибо. Я пробовал mapConcat немного по-другому, и это не сработало. В чем именно заключается значение идентичности. - person anindyaju99; 19.07.2016
comment
identity определена в scala.Predef и выглядит именно так: функция, которая преобразует что-то в себя. Очень полезно для передачи в качестве функции, необходимой для map, flatMap и т. д. - person Phasmid; 19.07.2016