Akka сохраняется при восстановлении завершенного состояния обновлений после первого сообщения

У меня стойкий актер. Когда он запускается в первый раз (база данных пуста), я сохраняю некоторые исходные данные. Но состояние не обновляется, как я ожидал. Он обновляется только после обработки первого сообщения. Как я могу заставить актера начать обрабатывать сообщения после обновления состояния?

Код актера

class TestActor extends PersistentActor {
  var numberOfEvents = 0

  def updateState(e: Any): Unit = {
    println("updating")
    numberOfEvents += 1
  }

  override def receiveRecover: Receive = {
    case RecoveryCompleted =>
      if (numberOfEvents == 0) {
        println("persisting")
        persist("foo")(updateState)
      }
  }

  override def receiveCommand: Receive = {
    case _ => {
      println("answering")
      sender ! numberOfEvents
    }
  }
}

Тестовый код

Await.result(actorRef ? "stats", Duration.Inf) shouldBe 0 // I wan't 1 here
Await.result(actorRef ? "stats", Duration.Inf) shouldBe 1

Выход

persisting
answering // why this goes before updating?
updating
answering

Полный код


person Yaroslav    schedule 15.04.2016    source источник


Ответы (1)


Одна вещь, которую вы хотите пересмотреть, заключается в том, что обычно вы не обновляете состояние события RecoveryCompleted, а скорее обрабатываете события, которые вы сохранили, чтобы восстановить состояние. Сообщение RecoveryCompleted предназначено для обработки того, что делать в конце восстановления. Эти события будут событиями, воспроизведенными из журнала, в котором вы сохранились. При необходимости вы также получите события моментальных снимков, если используете моментальные снимки.

Например:

override def receiveRecover: Receive = {
    case Added(num) =>
        updateState(num) 

    case SnapshotOffer(metadata, snapshot) ⇒
       // Restore your full state from the data in the snapshot

    case RecoveryCompleted =>
        println("Recovery completed") // use logger here
  }

ReceiveCommand, с другой стороны, используется для обработки входящих команд и сохранения этих событий, а затем обновления внутреннего состояния после того, как эти события были обновлены.

override def receiveCommand: Receive = {
    case Add(num) => {
      println("received event and persisting")

      persist(Added(num){ evt ⇒ 
        // This gets called after the persist succeeds
        updateState(num)
        sender ! numberOfEvents
      }      
    }
  }

def updateState(e: Int): Unit = {
    println("updating")
    numberOfEvents += e
  }

С точки зрения обмена сообщениями, я считаю полезным называть событие прошедшим временем команды, как вы можете видеть ниже:

// События

Добавлен класс case(v:Int)

// Команды

класс case Add(v:Int)

Надеюсь, это немного прояснит ситуацию.

person Al Iacovella    schedule 16.04.2016