Акка: мониторинг тестирования\дозор смерти

В моем сценарии у меня есть 2 актера:

  1. watchee (я использую TestProbe)
  2. watcher (Watcher завернуто в TestActorRef, чтобы показать некоторые внутренние state, которые я отслеживаю в своем тесте)

Наблюдатель должен предпринять какие-то действия, когда watchee умрет.

Вот полный тестовый пример, который я написал до сих пор:

class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {

  def this() = this(ActorSystem("TempTest"))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  class WatcherActor(watchee: ActorRef) extends Actor {

    var state = "initial"
    context.watch(watchee)

    override def receive: Receive = {
      case "start" =>
        state = "start"
      case _: Terminated =>
        state = "terminated"
    }

  }

  test("example") {
    val watchee = TestProbe()
    val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))

    assert(watcher.underlyingActor.state === "initial")

    watcher ! "start" // "start" will be sent and handled by watcher synchronously
    assert(watcher.underlyingActor.state === "start")

    system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
    Thread.sleep(100) // what is the best way to avoid blocking here?
    assert(watcher.underlyingActor.state === "terminated")
  }

}

Теперь, поскольку все задействованные акторы используют CallingThreadDispatcher (все тестовые помощники Akka создаются с использованием реквизита с .withDispatcher(CallingThreadDispatcher.Id)), я могу с уверенностью предположить, что когда этот оператор возвращает:

watcher ! "start"

... сообщение «старт» уже обработано WatchingActor, и поэтому я могу делать утверждения на основе watcher.underlyingActor.state

Однако, по моим наблюдениям, когда я останавливаю watchee с помощью system.stop или отправляя ему Kill, сообщение Terminated, созданное как побочный эффект смерти watchee, выполняется асинхронно, в другом потоке.

Не-решение состоит в том, чтобы остановить watchee, заблокировать поток на некоторое время и проверить состояние Watcher после этого, но я хотел бы знать, как мне сделать это правильно (т.е. как быть уверенным, что после убийства актера его наблюдатель < strong>получено и обработано Terminated сообщение о смерти)?


person Eugene Loy    schedule 17.10.2014    source источник


Ответы (2)


РЕДАКТИРОВАТЬ: после обсуждения и тестирования с OP мы обнаружили, что отправка PoisonPill в качестве средства завершения наблюдаемого актера обеспечивает желаемое поведение, поскольку завершение от PPill обрабатывается синхронно, а от остановки или уничтожения обрабатываются асинхронно.

Хотя мы не уверены в причине, лучше всего предположить, что это связано с тем, что убийство актера вызывает исключение, а PPilling — нет.

Судя по всему, это не имеет ничего общего с использованием шаблона gracefulStop в соответствии с моим первоначальным предложением, которое я оставлю ниже.

Таким образом, решение проблемы OP заключалось в том, чтобы просто остановить наблюдаемого актера, отправляющего PPill вместо сообщения Kill или выполнив system.stop.


Старый ответ начинается здесь:

Я могу предложить что-то немного несвязанное, но я чувствую, что это может быть применимо.

Если я правильно понял, то, что вы хотите сделать, это в основном синхронно прекратить действие актера, то есть сделать что-то, что возвращается только после того, как актер официально умер и его смерть была зарегистрирована (в вашем случае наблюдателем).

В общем, уведомление о смерти, как и многое другое в акке, асинхронно. Тем не менее, можно получить подтверждение синхронной смерти с помощью шаблона изящной остановки (akka.pattern.gracefulStop).

Для этого код должен быть примерно таким:

val timeout = 5.seconds
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
Await.result(killResultFuture, timeout)

Что это делает, так это отправляет PoisonPill жертве (примечание: вы можете использовать собственное сообщение), которая ответит будущим, которое будет завершено после смерти жертвы. Используя Await.result, вы гарантированно будете синхронными.

К сожалению, это можно использовать только в том случае, если а) вы активно убиваете жертву и вместо этого не хотите реагировать на внешнюю причину смерти б) вы можете использовать тайм-ауты и блокировки в своем коде. Но, возможно, вы сможете адаптировать этот шаблон к своей ситуации.

person Diego Martinoia    schedule 22.10.2014
comment
Попробовал ваш фрагмент, и это сработало. Забавно то, что он страдает почти той же проблемой, что и первоначальный ответ @cmbaxter (ожидание результата gracefulStop гарантирует возврат после 'posStop' целевого актера, а не после того, как все Terminated, произведенные смертью целевого актера, потребляются). Я согнул голову, пытаясь понять, почему у вас зачищенная работа, и оказалось, что это не gracefulStop заставляет ее работать, а то, что вы отправляете PoisonPill на терминацию актора. - person Eugene Loy; 24.10.2014
comment
По какой-то причине Terminated, произведенные смертью актера от PoisonPill, обрабатываются синхронно, а те, что произведены смертью от Kill или system.stop - асинхронно. Я еще не понял, почему, но в конце концов это означает, что можно просто отправить PoisonPill в watchee для достижения моей цели (зная, когда актер умер и все watcher были подтверждены), и что gracefulStop не имеет ничего общего с решением эта проблема (на самом деле, использование gracefulStop и Kill в качестве сообщения приводит к состоянию гонки, о котором я говорил в комментарии выше). - person Eugene Loy; 24.10.2014
comment
Я тоже не уверен в этом. Может быть, это потому, что Kill выдает исключение, а PPill — нет? - person Diego Martinoia; 24.10.2014
comment
Скорее всего причина в этом, хотя я еще не проверял. - person Eugene Loy; 27.10.2014
comment
Я собираюсь присудить награду за этот пост, поскольку он (хотя и не касается напрямую моей проблемы и оставляет некоторые вопросы) указал мне на решение, которое я собираюсь использовать. Кроме того, голосование за остальные текущие ответы по той же причине. Диего, пожалуйста, отредактируйте этот ответ, чтобы отразить тот факт, что отправка PoisonPill работает, и я также приму ее. - person Eugene Loy; 27.10.2014
comment
Сделанный. Я также оставил там предыдущий ответ, чтобы люди могли понять комментарии, которые мы разместили. Если вы считаете, что лучше полностью удалить предыдущую версию, просто дайте мне знать (я новичок здесь на SO) - person Diego Martinoia; 27.10.2014

Один из способов решить эту проблему — добавить в тест еще один наблюдатель, который также наблюдает за файлом watchee. Этот другой наблюдатель — это TestProbe, который позволит нам выполнить для него утверждение, которое избавит вас от проблем с синхронизацией, которые вы видите. Во-первых, измененный тестовый код:

 val watchee = TestProbe()
 val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
 val probeWatcher = TestProbe()
 probeWatcher watch watchee.ref

 assert(watcher.underlyingActor.state === "initial")

 watcher ! "start" // "start" will be sent and handled by watcher synchronously
 assert(watcher.underlyingActor.state === "start")

 system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
 probeWatcher.expectTerminated(watchee.ref)
 assert(watcher.underlyingActor.state === "terminated")

Итак, вы можете видеть, что я представил дополнительный наблюдатель строками:

val probeWatcher = TestProbe()
probeWatcher watch watchee.ref

Затем, позже в коде, перед окончательным утверждением, которое не работает для вас, я использую другое утверждение, которое позволяет мне узнать, что сообщение Terminated для остановленного актера было правильно распределено:

probeWatcher.expectTerminated(watchee.ref)

Когда код переходит за эту строку, я могу быть уверен, что тестируемый watcher также получил сообщение о завершении, и последующее утверждение пройдет.

ИЗМЕНИТЬ

Как отмечает ОП, в этом коде присутствует определенный уровень недетерминизма. Другое возможное решение — изменить строку в тестовом коде, которая останавливает актор на:

watcher.underlyingActor.context.stop(watchee.ref)

Используя context из TestActorRef, я считаю, что Terminated будет доставляться через CallingThreadDispatcher и, таким образом, будет полностью синхронным. Я проверил это в цикле, и он работал у меня более 1000 итераций.

Теперь я подумал, что, может быть, из-за того, что я выполнял stop, используя того же актера, который ожидал Terminated, возможно, была оптимизация для доставки Terminated самому себе для этого сканирования, поэтому я также проверил это с совершенно другим Actor следующим образом:

class FooActor extends Actor{
  def receive = {
    case _ =>
  }

Затем в тестовом коде:

val foo = TestActorRef(new FooActor)

И на остановке:

foo.underlyingActor.context.stop(watchee.ref)

Это также сработало, как и ожидалось.

person cmbaxter    schedule 20.10.2014
comment
Боюсь, это не решение. Перестроение теста способом, который вы предлагаете, вводит состояние гонки (что именно гарантирует, что Terminated, отправленное probeWatcher, будет обработано после того, как Terminated будет отправлено watcher?). Посмотрите это, запустив предоставленный вами код в цикле. - person Eugene Loy; 20.10.2014