В настоящее время у меня есть служба, основанная на событиях, поддерживаемая Akka и Cassandra. Это система торгов под названием AuctionService, и в какой-то момент мне нужно получить последнее событие ставки с именем BiddenOnLot
. Для этого я использую akka-persistence-query
.
Вот мой текущий код:
// obtain read journal by plugin id
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](
"cassandra-query-journal")
// issue query to journal
val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(self.path.name.toString, 0, Long.MaxValue)
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach(envelope ⇒ {
if (envelope.event.isInstanceOf[BiddenOnLot]) {
val biddenOnLot = envelope.event.asInstanceOf[BiddenOnLot]
if (biddenOnLot.paddleId == paddleId) {
// TODO: get last bid event by paddle id
}
}
})
Пока я перебираю все события и могу определить их тип. Но я действительно изо всех сил пытаюсь изолировать последнее событие ставки и иметь возможность использовать его асинхронно. Любые идеи?