Несколько приемников в одном потоке

У меня есть такой поток и два стока, но одновременно используется только один:

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)

or

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)

Можно настроить, какой приемник мы используем, но что, если я использую оба приемника параллельно. Как я могу это сделать?

Я думал о Sink.combine, но он требует еще и стратегии слияния, а я не хочу никоим образом объединять результаты этих стоков. Меня это не волнует, поэтому я хочу отправлять одни и те же данные через HTTP на какую-то конечную точку и одновременно отправлять их в базу данных. Комбинация стоков очень похожа на трансляцию, но реализация трансляции с нуля снижает читабельность моего кода, где теперь у меня есть только простой источник, поток и сток, без низкоуровневых стадий графа.

Вы знаете, как правильно это сделать (с противодавлением и другими вещами, которые у меня есть, используя только одну раковину)?


person Piotr Kozlowski    schedule 19.12.2017    source источник


Ответы (2)


Вы можете использовать alsoTo (см. Документация по API):

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
person Sebastian    schedule 19.12.2017
comment
Как я могу запустить эти приемники параллельно, добавив простой .async перед вторым приемником? Я хотел бы запускать их параллельно, но все же иметь противодавление, другими словами, я хотел бы, чтобы мой поток работал так же быстро, как время, проведенное в самом медленном приемнике, а не как сумма времени, проведенного во всех приемниках (потому что они выполняются синхронно). - person Piotr Kozlowski; 15.01.2018
comment
Стоит отметить, что приемник из alsoTo() будет выполнен в конце, после того, как приемник объявлен в to(). - person Piotr Kozlowski; 30.10.2018

Вещание с использованием GraphDSL в его простейшей форме не должно снижать удобочитаемость — на самом деле можно даже утверждать, что предложения ~> каким-то образом помогают визуализировать структуру потока:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val bcast = builder.add(Broadcast[Int](2))

  Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
  bcast.out(0) ~> sink1
  bcast.out(1) ~> sink2

  ClosedShape
})
graph.run()
person Leo C    schedule 20.12.2017
comment
Эти раковины работают параллельно? - person Piotr Kozlowski; 15.01.2018
comment
По умолчанию Akka Streams выполняет этапы обработки графа последовательно, но при желании вы можете выполнять их параллельно, используя метод async. Подробнее см. в документе Akka Stream по теме. - person Leo C; 16.01.2018