Порядок сообщений Akka Streams ActorRefSource

Я хочу создать последовательность элементов, используя ActorRefSource akka Streams. В указанный источник непрерывно подаются данные. После завершения вычислений поток завершается ядовитой пилюлей.

Следующий упрощенный пример показывает мое намерение:

val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
    .mapMaterializedValue{ ref =>
      for(i <- 1 to 1000) {
        ref ! i
      }

      ref ! PoisonPill
    }

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size))

Я ожидал, что Stream обработает все 1000 элементов, а затем завершится из-за получения Poison Pill. К сожалению, поток обычно завершается намного раньше. Примеры выходных данных:

count: 24

Подождите некоторое время перед отправкой ядовитой пилюли, например. 1000 мс приводит к обработке всех чисел.

Любая идея о том, как убедиться, что все предметы были обработаны до получения ядовитой пилюли, будет высоко оценена.


person Calardan    schedule 31.08.2016    source источник


Ответы (1)


См. документацию для Source.actorRef: PoisonPill не очищает буфер перед завершением потока.

person Roland Kuhn    schedule 31.08.2016
comment
Итак, чтобы убедиться, что все элементы обрабатываются до завершения работы, я должен вместо этого передать экземпляр akka.actor.Status.Succes? Благодарю за разъяснение! - person Calardan; 31.08.2016