Странное поведение try/catch со Scala + Akka

Я пытаюсь использовать Akka для реализации TCP-сервера для пользовательского протокола приложения. Я пытаюсь следовать приведенному здесь примеру: http://doc.akka.io/docs/akka/2.0/scala/io.html для выполнения неблокирующего ввода-вывода внутри цикла for...yield.

Я обнаружил, что когда я выбрасываю исключение из блока yield, я не могу поймать его снаружи блока. Я думаю, что у меня есть фундаментальное непонимание того, как здесь работают Akka или Scala, и я был бы признателен за любые советы.

Я сократил код до этого:

import akka.actor._
import java.net.InetSocketAddress

class EchoServer(port: Int) extends Actor {

  val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {
    case IO.NewClient(server) =>
      val socket = server.accept()
      state(socket) flatMap (_ => EchoServer.processRequest(socket))
    case IO.Read(socket, bytes) =>
      state(socket)(IO.Chunk(bytes))
    case IO.Closed(socket, cause) =>
      state(socket)(IO.EOF(None))
      state -= socket
  }
}

object EchoServer extends App
{
  def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
  {
    println( "In process request")
    try {
      for {
        bs <- IO take 1
      } yield {
        println("I'll get here")
        throw new Exception("Hey-o!")
        println("But not here ... as expected")
      }
    } catch {
      case e: Exception => println("And not here ... wtf?"); IO.Done()  // NEVER GETS HERE
    }
  }

  ActorSystem().actorOf(Props(new EchoServer(8080)))
}

Может быть, удобнее следовать сути здесь: https://gist.github.com/2296554

Кто-нибудь может объяснить, почему в этой ситуации управление не достигает моего блока catch?

Я заметил, что если я включаю ведение журнала отладки в Akka, я вижу это сообщение в выводе:

[DEBUG] [04/03/2012 22:42:25.106] [EchoServerActorSystem-akka.actor.default-dispatcher-1] [Future] Hey-o!

Итак, я предполагаю, что диспетчер Akka обрабатывает исключение? Кто-нибудь может объяснить, как это возможно?


person Russ Weeks    schedule 04.04.2012    source источник


Ответы (1)


Суть неблокирующего ввода-вывода, конечно же, в том, что нет никакой гарантии, когда и где он будет выполнен. Помните, что для понимания можно написать как

(IO take 1).map(bs => {
  println("I'll get here"); throw // ...
}

Что делает этот код? IO take 1 возвращает некоторую неблокирующую Future вещь, к которой затем добавляется функция преобразования через метод map. т.е. когда (и где бы) IO take 1 не было готово, он применит map к результату.

Все это происходит в каком-то другом потоке (или с использованием какого-то другого способа реализации неблокирующей семантики), поэтому trycatch никак не может отреагировать на выбрасываемые Exception. Метод bs => println(…) … также не будет знать о вашей обработке исключений. Все, что он знает, это то, что он должен преобразовать некоторый ввод bs и получить результат, когда он будет завершен.

Урок, который необходимо усвоить: при использовании неблокирующего кода избегайте побочных эффектов. Особенно, если побочные эффекты используются для изменения потока выполнения.

Я думаю, чтобы на самом деле поймать исключение, вам придется структурировать его следующим образом (не проверено; см. API):

def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
  {
    println( "In process request")
    (
      for {
        bs <- IO take 1
      } yield {
        println("I'll get here")
        throw new Exception("Hey-o!")
        println("But not here ... as expected")
      }
    ) recover {
      case e: Exception => println("And here ...?"); IO.Done()
    }
  }
person Debilski    schedule 04.04.2012
comment
Спасибо за очень полезный ответ, он действительно проясняет ситуацию. - person Russ Weeks; 10.04.2012