Петля в потоке делает поток бесконечным

У меня есть правильно работающий график, который имеет поток с циклом. Предметы проходят как положено и все работает. Но, к сожалению, с ограниченным исходным графом никогда не заканчивается. Всегда. Как я могу это исправить?

Вот схема моего потока.

введите здесь описание изображения

Вот упрощенная версия потока с той же топологией.

val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val mergeEntrance = builder.add(MergePreferred[Int](1))
  val mergePreExit = builder.add(Merge[Int](2))

  val part1 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 1 else 0 ))
  val part2 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 0 else 1 ))

  mergeEntrance.out ~> part1.in
  part1.out(0) ~> mergePreExit.in(0)
  part2.in <~ Flow[Int].collect{
    case v if v == 1 =>
      -1
  } <~ part1.out(1)

  mergeEntrance.preferred <~ part2.out(0)
  part2.out(1) ~> mergePreExit.in(1)

  FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})

val completionFut = Source(List(0, 1, 2, 3))
  .buffer(4, OverflowStrategy.fail)
  .via(badFlow)
  .mapAsync(1) {
    case v => Future.successful(v * 10)
  }
  .toMat(Sink.foreach(v => println(s">>> $v")))(Keep.right).run()

for (result <- completionFut)
  println("Stream is over!")

person expert    schedule 25.06.2016    source источник
comment
Вопрос, который вы должны задать себе: когда завершается слияние?   -  person Viktor Klang    schedule 26.06.2016
comment
@ViktorKlang Это зависит от того, что вы подразумеваете под полным. Я вижу, что 1 проходит цикл, становится -1, а затем приемником печатается как -10.   -  person expert    schedule 26.06.2016
comment
вы использовали слово «конец», не могли бы вы начать с определения его значения на графике?   -  person Viktor Klang    schedule 26.06.2016
comment
@ViktorKlang Я ожидаю, что Future, возвращенный методом run(), завершится. В противном случае у меня нет возможности узнать, когда будет выполнена работа по обработке ограниченного потока.   -  person expert    schedule 27.06.2016
comment
Итак, как вы ожидаете, что это распространение завершения произойдет на вашем графике? (подумайте о том, как завершается слияние)   -  person Viktor Klang    schedule 27.06.2016