Я пытаюсь использовать пакет scalaz iteratee для обработки большого zip-файла в постоянном пространстве. У меня есть длительный процесс, который мне нужно выполнить для каждого файла в zip-файле. Эти процессы могут (и должны) выполняться параллельно.
Я создал EnumeratorT
, который превращает каждый ZipEntry
в объект File
. Подпись выглядит так:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
Я хочу прикрепить IterateeT
, который будет выполнять длительный процесс для каждого файла. Я в основном получаю что-то вроде:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
Когда я пытаюсь запустить его:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
Я получаю сообщение java.lang.OutOfMemoryError: Java heap space
. Для меня это имеет смысл, так как он пытается создать в памяти массивный список всех этих IO
и Promise
объектов.
Несколько вопросов:
- У кого-нибудь есть идеи, как этого избежать? Такое ощущение, что я неправильно подхожу к проблеме, потому что я действительно забочусь о
longRunningProcess
только из-за его побочных эффектов. - Является ли подход
Enumerator
здесь неправильным подходом?
У меня почти нет идей, так что все поможет.
Спасибо!
Обновление №1
Вот трассировка стека:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
В настоящее время я следую совету nadavwr, чтобы убедиться, что все работает так, как я думаю. Я буду сообщать обо всех обновлениях.
Обновление №2
Используя идеи из обоих ответов ниже, я нашел достойное решение. Как предположил huynhjl (и я проверил, используя предложение nadavwr об анализе дампа кучи), consume
приводил к тому, что каждый раздутый ZipEntry
удерживался в памяти, поэтому процессу не хватало памяти. Я изменил consume
на foldM
и обновил длительный процесс, чтобы он просто возвращал Promise[IOE[Unit]]
вместо ссылки на файл. Таким образом, в конце у меня есть коллекция всех исключений IoException. Вот рабочее решение:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
Это решение увеличивает каждую запись при асинхронной загрузке. В итоге у меня есть огромный список выполненных Promise
объектов, в которых есть ошибки. Я до сих пор не полностью уверен, что это правильное использование Iteratee, но теперь у меня есть несколько повторно используемых составных частей, которые я могу использовать в других частях нашей системы (это очень распространенный шаблон для нас).
Спасибо за вашу помощь!