play2 создает Enumeratee для преобразования одного обещания в другое обещание

Я пытаюсь освоить использование Iteratees в Play 2 для потоковой передачи результатов кометы. У меня есть дескриптор создания перечислителя из обратного вызова и перечислителя из карты. Моя проблема связана с Enumeratee.map, она принимает функцию, которая принимает чистый ввод и возвращает чистый вывод (например, преобразование String в Int в doc). Что я хотел бы сделать, так это взять чистый ввод и вернуть обещание результата. В конце концов, перечислитель передает перечислителю обещания, перечислитель преобразует один перечислитель в другой, поэтому должен быть способ сделать перечислитель, который сопоставляется с обещаниями.

Теперь позвольте мне привести пример, чтобы сделать это немного яснее. Допустим, у меня есть HTTP-запрос со списком идентификаторов для запроса в моей базе данных. Предположим, что эти идентификаторы представляют строки в таблице базы данных, и запрос выполняет набор (длинных) вычислений для этих строк, а затем возвращает набор объектов json, представляющих вычисление. Поскольку у меня есть долгая блокировка, было бы здорово передавать этот идентификатор по одному за раз, поэтому я хотел бы иметь перечисляемый конвейер, который делает:

  1. запрашивать строку в базе данных (возвращает обещание строки)
  2. сделать длинное вычисление в строке (берет строку и возвращает обещание вычисления)
  3. конвертировать длинные вычисления в JSON
  4. &> это в перечисление Comet, предоставленное Play 2

1 довольно просто, я могу создать перечислитель с fromCallback, который вернет обещание результата запроса. 3 также довольно прост, так как это простой Enumeratee.map

Но я не могу понять, как реализовать applyOn перечислителя на шаге 2. Я могу понять, что у меня есть бо создать новый итератор, который получает обещание от «внутреннего» итератора, flatMap длинный расчет и возврат новое обещание. Чего я не понимаю, так это того, как это сделать, учитывая странную подпись applyOn: def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

Может ли кто-нибудь помочь мне с этим?

Спасибо


person Mortimer    schedule 21.05.2012    source источник
comment
просто примечание, в этом простом примере я мог бы явно объединить 1 и 2 в простом перечислителе, но реальное приложение на самом деле немного сложнее;)   -  person Mortimer    schedule 21.05.2012


Ответы (2)


Подпись applyOn имеет больше смысла, если вы думаете, что перечисляемый элемент объединяется с итерируемым справа, как в enumerator |>> (enumeratee &> iteratee). итерация имеет тип Iteratee[E, A], а перечислитель ожидает Iteratee[Promise[E], Iteratee[E, A], чтобы можно было извлечь внутреннюю итерацию. Таким образом, applyOn должен будет получить Iteratee[E, A] и вернуть Iteratee[Promise[E], Iteratee[E, A].

Вот схема реализации. Он определяет ступенчатую функцию, которая принимает входные данные и возвращает ожидаемый результат. Затем рекурсивно проходит через элементы обещания.

import play.api.libs.concurrent._
import play.api.libs.iteratee._

def unpromise[E]: Enumeratee[Promise[E], E] = new Enumeratee[Promise[E], E] {
  def applyOn[A](inner: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
    def step(input: Input[Promise[E]], i: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
      input match {
        case Input.EOF => Done(i, Input.EOF)
        case Input.Empty => Cont(step(_, i))
        case Input.El(pe) =>
          val pe2 = pe.map(e => i.feed(Input.El(e))).flatMap(identity)
          val i2 = Iteratee.flatten(pe2)
          i2.pureFlatFold(
            (a, e2) => Done(i2, Input.Empty),
            k => Cont(step(_, i2)),
            (msg, e2) => Done(i2, Input.Empty))
      }
    }
    // should check that inner is not done or error - skipped for clarity
    Cont(step(_, inner))
  }
}

Я отбрасываю e2, так что, вероятно, есть еще код, чтобы гарантировать, что некоторые входные данные не будут потеряны.

person huynhjl    schedule 22.05.2012
comment
Хорошо, это похоже на решение моей проблемы: иметь перечисляемую карту, которая создает обещания, затем конвертировать с помощью unpromise и продолжать в том же духе. Мне придется изучить это, так как я все еще не совсем понимаю, что такое e2 и i2. Для проверки ошибки/сделано, я думаю, что смогу использовать Enumeratee.CheckDone. Спасибо. - person Mortimer; 24.05.2012
comment
@ Мортимер, pe это Promise[E]. pe.map(e => i.feed(Input.El(e))) должно быть Promise[Promise[Iteratee[E, A]]]. pe2 это Promise[Iteratee[E, A]]. i2 является Iteratee[E, A] и является следующим шагом вычислений в процессе. e2 - это оставшийся ввод, если вычисление в процессе выполнено или является ошибкой. В большинстве случаев это будет Input.Empty, но это может быть значение типа E, если ввод не был полностью использован (например, peek). - person huynhjl; 24.05.2012

В мастере Enumeratee.mapM[E] есть метод, который принимает f: E => Promise[NE] и возвращает Enumeratee[E, NE]

https://github.com/playframework/Play20/blob/master/framework/src/play/src/main/scala/play/api/libs/iteratee/Enumeratee.scala#L150

person Sadache    schedule 01.06.2012
comment
Я уже портировал generateM для работы в 2.0.1, попробую и mapM, мерси ;) - person Mortimer; 02.06.2012