Я хочу создать последовательность элементов, используя ActorRefSource akka Streams. В указанный источник непрерывно подаются данные. После завершения вычислений поток завершается ядовитой пилюлей.
Следующий упрощенный пример показывает мое намерение:
val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
.mapMaterializedValue{ ref =>
for(i <- 1 to 1000) {
ref ! i
}
ref ! PoisonPill
}
source.runWith(Sink.seq).foreach(s => println("count: "+s.size))
Я ожидал, что Stream обработает все 1000 элементов, а затем завершится из-за получения Poison Pill. К сожалению, поток обычно завершается намного раньше. Примеры выходных данных:
count: 24
Подождите некоторое время перед отправкой ядовитой пилюли, например. 1000 мс приводит к обработке всех чисел.
Любая идея о том, как убедиться, что все предметы были обработаны до получения ядовитой пилюли, будет высоко оценена.