Фильтр и группировка Akka Streams по набору ключей

У меня есть поток

case class Msg(keys: Seq[Char], value: String)

Теперь я хочу отфильтровать подмножество ключей, например. val filterKeys = Set[Char]('k','f','c') и Filter(k.exists(filterKeys.contains))) И затем разделите их так, чтобы определенные ключи обрабатывались разными потоками, а затем снова объединялись вместе в конце;

                                 /-key=k-> f1 --\
Source[Msg] ~> Filter ~> router |--key=f-> f2 ----> Merge --> f4
                                 \-key=c-> f3 --/

Как мне это сделать?

FlexiRoute старым способом казался хорошим вариантом, но в новом API я предполагаю, что хочу либо создать собственный GraphStage, либо создать свой собственный график из DSL, поскольку я не вижу возможности сделать это с помощью встроенного этапы ..?


person NightWolf    schedule 06.10.2016    source источник


Ответы (1)


Решение для набора малых клавиш

Если ваш набор ключей небольшой и неизменяемый, то, вероятно, проще всего будет понять комбинацию широковещательной передачи и фильтра. Сначала вам нужно определить описанный вами фильтр:

def goodKeys(keySet : Set[Char]) = Flow[Msg] filter (_.keys exists keySet.contains)

Затем это может быть передано вещательной компании, как описано в документации.. Все Msg значения с хорошими ключами будут транслироваться в каждый из трех фильтров, и каждый фильтр будет допускать только определенный ключ:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._

  val source : Source[Msg] = ???

  val goodKeyFilter = goodKeys(Set('k','f','c'))

  val bcast = builder.add(BroadCast[Msg](3))
  val merge = builder.add(Merge[Msg](3))

  val kKey = goodKeys(Set('k'))
  val fKey = goodKeys(Set('f'))
  val cKey = goodKeys(Set('c'))

  //as described in the question
  val f1 : Flow[Msg, Msg, _] = ???
  val f2 : Flow[Msg, Msg, _] = ???
  val f3 : Flow[Msg, Msg, _] = ???

  val f4 : Sink[Msg,_] = ???

  source ~> goodKeyFilter ~> bcast ~> kKey ~> f1 ~> merge ~> f4
                             bcast ~> fKey ~> f2 ~> merge
                             bcast ~> cKey ~> f3 ~> merge

Решение для набора больших клавиш

Если у вас большой набор ключей, то лучше groupBy. Предположим, у вас есть Map ключей к функциям:

//e.g. 'k' -> f1
val keyFuncs : Map[Set[Char], (Msg) => Msg]

Эту карту можно использовать с функцией groupBy:

source
  .via(goodKeys(Set('k','f','c'))
  .groupBy(keyFuncs.size, _.keys)
  .map(keyFuncs(_.keys)) //apply one of f1,f2,f3 to the Msg
  .mergeSubstreams
person Ramón J Romero y Vigil    schedule 06.10.2016