Решение для набора малых клавиш
Если ваш набор ключей небольшой и неизменяемый, то, вероятно, проще всего будет понять комбинацию широковещательной передачи и фильтра. Сначала вам нужно определить описанный вами фильтр:
def goodKeys(keySet : Set[Char]) = Flow[Msg] filter (_.keys exists keySet.contains)
Затем это может быть передано вещательной компании, как описано в документации.. Все Msg
значения с хорошими ключами будут транслироваться в каждый из трех фильтров, и каждый фильтр будет допускать только определенный ключ:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val source : Source[Msg] = ???
val goodKeyFilter = goodKeys(Set('k','f','c'))
val bcast = builder.add(BroadCast[Msg](3))
val merge = builder.add(Merge[Msg](3))
val kKey = goodKeys(Set('k'))
val fKey = goodKeys(Set('f'))
val cKey = goodKeys(Set('c'))
//as described in the question
val f1 : Flow[Msg, Msg, _] = ???
val f2 : Flow[Msg, Msg, _] = ???
val f3 : Flow[Msg, Msg, _] = ???
val f4 : Sink[Msg,_] = ???
source ~> goodKeyFilter ~> bcast ~> kKey ~> f1 ~> merge ~> f4
bcast ~> fKey ~> f2 ~> merge
bcast ~> cKey ~> f3 ~> merge
Решение для набора больших клавиш
Если у вас большой набор ключей, то лучше groupBy. Предположим, у вас есть Map
ключей к функциям:
//e.g. 'k' -> f1
val keyFuncs : Map[Set[Char], (Msg) => Msg]
Эту карту можно использовать с функцией groupBy:
source
.via(goodKeys(Set('k','f','c'))
.groupBy(keyFuncs.size, _.keys)
.map(keyFuncs(_.keys)) //apply one of f1,f2,f3 to the Msg
.mergeSubstreams
person
Ramón J Romero y Vigil
schedule
06.10.2016