Получить actorRef в akka-stream 1.0 при использовании Source.actorPublisher и FlowGraph

Мой вопрос где-то связан с: Доступ к базовому ActorRef источника потока akka, созданного Source.actorRef, с некоторыми отличиями:

  1. Я использую экспериментальную версию akka-stream 1.0.
  2. Я использую модель ActorPublisher.
  3. Я использую FlowGraph dsl для определения потока с параллельной обработкой.

Я не нахожу способа заставить ActorRef отправить сообщение экземпляру Actor Publisher, хранящемуся в источнике.

 def run(implicit system: ActorSystem) = {
   import system.dispatcher
   implicit val materializer = ActorMaterializer()

   val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event }

   //Implementation in subpackage
   val sinkLevel1 = Sinks.sinkLevel1 
   val sinkLevel2 = Sinks.sinkLevel2

   //Implementation in subpackage
   val stageTriage = FlowStages.stageTriage    
   val stageEvalProcess1 = FlowStages.stageEvalProcess1
   val stageEvalProcess2 = FlowStages.stageEvalProcess2

   val pipeline = FlowGraph.closed(){ implicit builder => 
     import FlowGraph.Implicits._

     val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2))

     source ~> stageTriage ~> stageDispatchByRuleLevels
                              stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1
                              stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2

   }

   pipeline.run()

 }

Спасибо за помощь !

Оливер


person Oliver    schedule 04.09.2015    source источник


Ответы (1)


Основываясь на ответе Ноя на связанный вопрос, если вы добавите

val ref = pipeline.run()

затем вы можете отправлять сообщения рефералу, например

ref ! ...
person yardena    schedule 12.09.2015