Мой вопрос где-то связан с: Доступ к базовому ActorRef источника потока akka, созданного Source.actorRef, с некоторыми отличиями:
- Я использую экспериментальную версию akka-stream 1.0.
- Я использую модель ActorPublisher.
- Я использую FlowGraph dsl для определения потока с параллельной обработкой.
Я не нахожу способа заставить ActorRef отправить сообщение экземпляру Actor Publisher, хранящемуся в источнике.
def run(implicit system: ActorSystem) = { import system.dispatcher implicit val materializer = ActorMaterializer() val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event } //Implementation in subpackage val sinkLevel1 = Sinks.sinkLevel1 val sinkLevel2 = Sinks.sinkLevel2 //Implementation in subpackage val stageTriage = FlowStages.stageTriage val stageEvalProcess1 = FlowStages.stageEvalProcess1 val stageEvalProcess2 = FlowStages.stageEvalProcess2 val pipeline = FlowGraph.closed(){ implicit builder => import FlowGraph.Implicits._ val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2)) source ~> stageTriage ~> stageDispatchByRuleLevels stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1 stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2 } pipeline.run() }
Спасибо за помощь !
Оливер