Akka-stream отфильтровывает конкретное событие от Cassandra

В настоящее время у меня есть служба, основанная на событиях, поддерживаемая Akka и Cassandra. Это система торгов под названием AuctionService, и в какой-то момент мне нужно получить последнее событие ставки с именем BiddenOnLot. Для этого я использую akka-persistence-query.

Вот мой текущий код:

        // obtain read journal by plugin id
        val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](
          "cassandra-query-journal")

        // issue query to journal
        val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(self.path.name.toString, 0, Long.MaxValue)

        // materialize stream, consuming events
        implicit val mat = ActorMaterializer()

        source.runForeach(envelope ⇒ {
          if (envelope.event.isInstanceOf[BiddenOnLot]) {
            val biddenOnLot = envelope.event.asInstanceOf[BiddenOnLot]
            if (biddenOnLot.paddleId == paddleId) {
               // TODO: get last bid event by paddle id
            }
          }
        })

Пока я перебираю все события и могу определить их тип. Но я действительно изо всех сил пытаюсь изолировать последнее событие ставки и иметь возможность использовать его асинхронно. Любые идеи?


person Tíbó    schedule 04.05.2017    source источник


Ответы (1)


Вы можете сделать это, используя сопоставление с образцом, чтобы деконструировать испущенное EventEnvelope и сопоставить его с нужным событием:

source.map {
    case EventEnvelope(_, _, _, bidenOnLot: BidenOnLot) if bidenOnLot.paddleId == paddleId => bidenOnLot
}.runForeach(println)
person Branislav Lazic    schedule 04.05.2017