Печать результатов Futures в Scala Worksheet

Я прохожу курс реактивного программирования на Coursera и, выполняя одно из заданий, наткнулся на нечто странное. В любом случае я добавил несколько методов в объект Future Companion через это расширение.

implicit class FutureCompanionOps[T](val f: Future.type) extends AnyVal {

    /** Returns a future that is always completed with `value`.
     */
    def always[T](value: T): Future[T] = Future(value)

    /** Returns a future that is never completed.
     *
     *  This future may be useful when testing if timeout logic works correctly.
     */
    def never[T]: Future[T] = Promise().future


    /** Given a list of futures `fs`, returns the future holding the list of values of all the futures from `fs`.
     *  The returned future is completed only once all of the futures in `fs` have been completed.
     *  The values in the list are in the same order as corresponding futures `fs`.
     *  If any of the futures `fs` fails, the resulting future also fails.
     */
    def all[T](fs: List[Future[T]]): Future[List[T]] = {
      val resPr = Promise[List[T]]()
      def function( in: List[Future[T]], fxs:Future[List[T]] ): Future[List[T]] =
      {
        if(in.isEmpty) fxs 
        else
        function( in.tail, for { i <- in.head ; xs <- fxs } yield { i :: xs } ) 
      }
      function( fs, resPr.success(Nil).future )
    }
}

Затем я написал это на листе Scala WorkSheet в Eclipse.

object TestSheet {

val tempPr = Promise[Boolean]()      
val anotherFuLs = List( Future.always(true), Future.always(false), tempPr.future )
                                                  //> anotherFuLs  : List[scala.concurrent.Future[Boolean]] = List(scala.concurren
                                                  //| t.impl.Promise$DefaultPromise@a19b1de, scala.concurrent.impl.Promise$Default
                                                  //| Promise@1cec6b00, scala.concurrent.impl.Promise$DefaultPromise@625dcec6)
  val crapFut = Future.all(anotherFuLs)           //> crapFut  : scala.concurrent.Future[List[Boolean]] = scala.concurrent.impl.Pr
                                                  //| omise$DefaultPromise@6564dbd5
  crapFut.isCompleted                             //> res3: Boolean = false
  tempPr.success(false)                           //> res4: nodescala.TestSheet.tempPr.type = scala.concurrent.impl.Promise$Defaul
                                                  //| tPromise@625dcec6
  crapFut.isCompleted                             //> res5: Boolean = true
  crapFut onComplete {
    case Success(ls) => println( ls )
    case Failure(e) => println( "Failed with Exception " + e )
  }
} 

несмотря ни на что, я не могу заставить рабочий лист Scala распечатать значения результирующего списка. Однако, когда я пишу модульный тест и запускаю тест scala, у меня нет проблем со сравнением итогового списка. Это ошибка в рабочем листе scala при работе с асинхронными вещами?

Это модульный тест

test("A composed future with all should complete when all futures complete") {
    val tempPr = Promise[Boolean]()
    val lsFu = List( Future.always(true), Future.always(false), tempPr.future );
    val fuL = Future.all( lsFu )
    fuL onComplete { case Success(ls) => println( "This got done" ); assert( ls === List( true, false, true ), "I should get back the expected List" ) 
                     case Failure(ex) => assert( false, "Failed with Exception " + ex ) } 
    assert( fuL.isCompleted === false, "The resulting Future should not be complete when the depending futures are still pending" )    
    tempPr.success(true)

  }

person Kartik Aiyer    schedule 29.12.2013    source источник
comment
FWIW, Future.successful делает то, что делает ваш метод always. Точно так же Future.sequence делает то же самое, что и ваш all.   -  person Dylan    schedule 29.12.2013
comment
Нам кажется, что эти методы уже существуют, но Задание заставило нас создать их как расширение для объекта-компаньона в качестве упражнения по расширению существующих объектов и классов.   -  person Kartik Aiyer    schedule 29.12.2013


Ответы (3)


Похоже, проблема в том, что основной поток, который запускает код вашего рабочего листа, заканчивается до того, как запускается обработчик onComplete.

Scala по умолчанию ExecutionContext представляет собой пул потоков, полный потоков демона. «Демон» в этом контексте означает, что даже если этот поток чем-то занят, это не предотвратит завершение работы JVM после завершения всех потоков, не являющихся демонами. В вашем случае основной поток, вероятно, является единственным потоком, не являющимся демоном, в программе.

Вызов onComplete для Future сделает так, что неявно предоставленный ExecutionContext выполнит ваш обработчик, когда Future завершится. Это означает, что обработчик работает в потоке демона. Поскольку onComplete — это последнее, что вы делаете в своем основном методе, JVM просто завершает работу до того, как ExecutionContext успеет запустить обработчик.

Обычно это не имеет большого значения. В таком сценарии, как веб-сервер, ваша JVM будет работать долгое время. Для вашего варианта использования я бы рекомендовал заблокировать для завершения Future, используя один из методов в scala.concurrent.Await. Таким образом, вы можете запустить свою логику завершения как часть основного потока в основном методе.

person Dylan    schedule 29.12.2013
comment
Я пробовал использовать Await, а также выполнять некоторый код после вызова future, но Future полностью теряется. Есть ли новый способ сделать это? - person crockpotveggies; 20.07.2016
comment
@crockpotveggies Просто сохраните ссылку на будущее как val. Await.ready и Await.result не приводят к потере будущего. - person Dylan; 20.07.2016
comment
Пытался сохранить фьючерсы как val, но я обнаружил, что это удача или промах. С тех пор, как я опубликовал свой последний комментарий, я использовал REPL для загрузки рабочего листа через :load, который, кажется, работает намного лучше. - person crockpotveggies; 20.07.2016

Intellij IDEA имеет аналогичную проблему, и она применима как к рабочему листу, так и даже к запуску приложения из IDEA.

Ключом являются библиотеки, которые находятся в пути к классам при запуске вашего кода. Команда scala превращается во что-то вроде:

execCommand /opt/jdk1.7.0_45/bin/java -Xmx256M -Xms32M -Xbootclasspath/a:
/opt/scala-2.10.3/lib/akka-actors.jar:
/opt/scala-2.10.3/lib/diffutils.jar:
/opt/scala-2.10.3/lib/jline.jar:
/opt/scala-2.10.3/lib/scala-actors.jar:
/opt/scala-2.10.3/lib/scala-actors-migration.jar:
/opt/scala-2.10.3/lib/scala-compiler.jar:
/opt/scala-2.10.3/lib/scala-library.jar:
/opt/scala-2.10.3/lib/scala-partest.jar:
/opt/scala-2.10.3/lib/scalap.jar:
/opt/scala-2.10.3/lib/scala-reflect.jar:
/opt/scala-2.10.3/lib/scala-swing.jar:
/opt/scala-2.10.3/lib/typesafe-config.jar 
-classpath "" -Dscala.home=/opt/scala-2.10.3 -Dscala.usejavacp=true scala.tools.nsc.MainGenericRunner 
-cp out/production/Sheets testing.SequenceMain 400

Проблема в том, что IDE не выполняет эквивалент команды scala, и конечным результатом является Await.result() не ожидание завершения потоков демона.

Вот конкретный пример (также вдохновивший курс Reactive): тестирование пакетов

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Promise, Future}
import scala.concurrent.duration.Duration

object SequenceMain {

  def main(args: Array[String]) {
    val delay = if (args.length > 0) args(0).toInt else 100
    if (args.length > 1) {
      println("Delays are: " + (List(1.0, 1.2, 0.8) map {df => (delay * df).toInt} mkString ", "))
    }

    val start = System.currentTimeMillis()
    var last = start

    def stamp(s: String) {
      val tn = Thread.currentThread().getName
      val now = System.currentTimeMillis()
      println(s"$tn: ${now-start} / ${now - last}: $s")
      last = now
    }

    def sequence[T](fs: List[Future[T]]): Future[List[T]] = {
      val pr = Promise[List[T]]()
      pr.success(Nil)
      val r: Future[List[T]] = fs.foldRight(pr.future) {
        (ft: Future[T], z: Future[List[T]]) =>
          val result: Future[List[T]] = for (t <- ft; ts <- z) yield {
            t :: ts
          }
          result
      }
      r
    }

    stamp("Making sequence of futures.")
    val fts: List[Future[String]] = List(
      Future[String] {
        Thread.sleep((delay * 1.0).toInt)
        "Future 0"
      },
      Future[String] {
        Thread.sleep((delay * 1.2).toInt)
        if (false) throw new Exception("Blew up")
        else "Future 1"
      },
      Future[String] {
        Thread.sleep((delay * 0.8).toInt)
        "Future 2"
      }
    )


    stamp("Making Future sequence.")
    val a1: Future[List[String]] = sequence(fts)

    stamp("Extracting sequence from future.")
    a1 foreach {
      (z: List[String]) => println("And the result is : " + z)
    }

    stamp("Await result.")
    Await.result(a1, Duration(10, "seconds"))
    stamp("Awaited result.")

  }
}

Запуск этого приложения внутри IDEA производит:

/opt/jdk1.7.0_45/bin/java -Didea.launcher.port=7541 -Didea.launcher.bin.path=/opt/idea/idea-IU-134.1160/bin -Dfile.encoding=UTF-8 -classpath /home/mkh/IdeaProjects/Sheets/out/production/Sheets:/opt/scala-2.10.3/lib/scala-library.jar:/opt/jdk1.7.0_45/jre/lib/rt.jar:/opt/idea/idea-IU-134.1160/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain testing.SequenceMain 400
main: 0 / 0: Making sequence of futures.
main: 87 / 87: Making Future sequence.
main: 90 / 3: Extracting sequence from future.
main: 90 / 0: Await result.
main: 562 / 472: Awaited result.

Process finished with exit code 0

Обратите внимание, что println «И результат есть» не печатается.

Однако, если код запускается напрямую, результат будет следующим:

mkh@rock:~/IdeaProjects/Sheets$ scala -cp out/production/Sheets:/opt/scala-2.10.3/lib/scala-library.jar testing.SequenceMain 400 
main: 1 / 1: Making sequence of futures.
main: 9 / 8: Making Future sequence.
main: 10 / 1: Extracting sequence from future.
main: 10 / 0: Await result.
main: 491 / 481: Awaited result.
And the result is : List(Future 0, Future 1, Future 2)

Обратите внимание, что время, потраченное на ожидание, немного больше, и println фактически завершено.

Еще более странно то, что println, казалось бы, не связанное с этим, заставляет этот пример работать даже без использования команды scala:

/opt/jdk1.7.0_45/bin/java -Didea.launcher.port=7539 -Didea.launcher.bin.path=/opt/idea/idea-IU-134.1160/bin -Dfile.encoding=UTF-8 -classpath /home/mkh/IdeaProjects/Sheets/out/production/Sheets:/opt/scala-2.10.3/lib/scala-library.jar:/opt/jdk1.7.0_45/jre/lib/rt.jar:/opt/idea/idea-IU-134.1160/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain testing.SequenceMain 400 1
Delays are: 400, 480, 320
main: 1 / 1: Making sequence of futures.
main: 59 / 58: Making Future sequence.
main: 62 / 3: Extracting sequence from future.
main: 62 / 0: Await result.
And the result is : List(Future 0, Future 1, Future 2)
main: 543 / 481: Awaited result.

Process finished with exit code 0

Пример теперь печатает задержки в качестве своего первого действия (чтобы подтвердить, что для завершения самого медленного Future требуется 480 мс), но каким-то образом этот начальный println имеет побочный эффект, заключающийся в том, что окончательный Await работает.

Кто-то намного умнее меня должен будет объяснить эту последнюю часть...

person Mike Hanafey    schedule 02.03.2014
comment
Последняя часть (о println) несколько распространена в параллельных системах. Println — это операция ввода-вывода, то есть она блокируется, поскольку ввод-вывод медленный. Количество времени, которое он блокирует, может быть достаточным для завершения будущего. - person Simão Martins; 27.07.2015

Поскольку мы не можем распечатать результат Future на листе, я предлагаю записать результат в файл. Затем вы можете использовать writeFile вместо println на своем листе.

 def writeFile(text : String) =  {
  val fw = new FileWriter("result.txt", true)
  try {
    fw.write(text)
    fw.write("\n")
  }
  finally fw.close()
}
person Pham Cong Bang    schedule 17.11.2017
comment
это в первую очередь сводит на нет предполагаемые преимущества использования рабочего листа, а именно то, что вы можете получить быструю петлю обратной связи для простых вычислений. Во всяком случае, оберните свои вычисления object X extends App {} в файл .scala и выполните это вместо рабочего листа. - person Cpt. Senkfuss; 02.09.2020