Scalaz 7 Iteratee для обработки большого zip-файла (OutOfMemoryError)

Я пытаюсь использовать пакет scalaz iteratee для обработки большого zip-файла в постоянном пространстве. У меня есть длительный процесс, который мне нужно выполнить для каждого файла в zip-файле. Эти процессы могут (и должны) выполняться параллельно.

Я создал EnumeratorT, который превращает каждый ZipEntry в объект File. Подпись выглядит так:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

Я хочу прикрепить IterateeT, который будет выполнять длительный процесс для каждого файла. Я в основном получаю что-то вроде:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

Когда я пытаюсь запустить его:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

Я получаю сообщение java.lang.OutOfMemoryError: Java heap space. Для меня это имеет смысл, так как он пытается создать в памяти массивный список всех этих IO и Promise объектов.

Несколько вопросов:

  • У кого-нибудь есть идеи, как этого избежать? Такое ощущение, что я неправильно подхожу к проблеме, потому что я действительно забочусь о longRunningProcess только из-за его побочных эффектов.
  • Является ли подход Enumerator здесь неправильным подходом?

У меня почти нет идей, так что все поможет.

Спасибо!

Обновление №1

Вот трассировка стека:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

В настоящее время я следую совету nadavwr, чтобы убедиться, что все работает так, как я думаю. Я буду сообщать обо всех обновлениях.

Обновление №2

Используя идеи из обоих ответов ниже, я нашел достойное решение. Как предположил huynhjl (и я проверил, используя предложение nadavwr об анализе дампа кучи), consume приводил к тому, что каждый раздутый ZipEntry удерживался в памяти, поэтому процессу не хватало памяти. Я изменил consume на foldM и обновил длительный процесс, чтобы он просто возвращал Promise[IOE[Unit]] вместо ссылки на файл. Таким образом, в конце у меня есть коллекция всех исключений IoException. Вот рабочее решение:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

Это решение увеличивает каждую запись при асинхронной загрузке. В итоге у меня есть огромный список выполненных Promise объектов, в которых есть ошибки. Я до сих пор не полностью уверен, что это правильное использование Iteratee, но теперь у меня есть несколько повторно используемых составных частей, которые я могу использовать в других частях нашей системы (это очень распространенный шаблон для нас).

Спасибо за вашу помощь!


person RJ Regenold    schedule 26.04.2013    source источник
comment
Что делает долгий процесс? Вычисляет ли он что-то из содержимого zip?   -  person huynhjl    schedule 26.04.2013
comment
Каждый файл в zip-файле является изображением. Долгий процесс загружает этот файл в Rackspace CloudFiles. Как только я это выясню, мне нужно будет добавить дополнительные процессы, которые изменяют размер изображений, а затем загружают их.   -  person RJ Regenold    schedule 26.04.2013
comment
Iteratees кажется неправильной абстракцией для этой работы, поскольку вы хотите распараллелить рабочую нагрузку. Я думаю, что актеры работали бы лучше.   -  person huynhjl    schedule 26.04.2013
comment
Забавно, что вы упомянули об этом, поскольку актеры — это то, с чего я начал, а затем где-то прочитал, что они — плохой выбор для полупоследовательной пакетной обработки. Итерации были рекомендованы! Я согласен, что чем больше я копаюсь в этом, тем больше это похоже на неправильную абстракцию. Я собираюсь попытаться отладить то, что у меня есть, поскольку у меня есть идея создать Iteratee, который запускает N количество промисов, блокируется, пока не получит ответы, а затем запрашивает дополнительные данные. Это звучит разумно? Спасибо!   -  person RJ Regenold    schedule 26.04.2013


Ответы (3)


Не используйте consume. См. мой другой недавний ответ: Как использовать ввод-вывод с итерациями Scalaz7 без переполнения стека?

foldM может быть лучшим выбором.

Также попробуйте сопоставить файл с чем-то другим (например, с кодом возврата успеха), чтобы увидеть, позволяет ли это JVM собирать мусор с завышенными записями zip.

person huynhjl    schedule 26.04.2013
comment
Спасибо за ваш ответ. В конце концов, использование foldM оказалось ключом. - person RJ Regenold; 26.04.2013

Насколько дорог (с точки зрения памяти ваш longRunningProcess? Как насчет дефляции файлов? Выполняются ли они столько раз, сколько вы ожидаете? (был бы полезен простой счетчик)

Трассировка стека поможет определить соломинку, сломавшую спину верблюда — иногда она и является виновником.

Если вы хотите быть уверены, что занимает так много памяти, вы можете использовать аргумент -XX:+HeapDumpOnOutOfMemoryError JVM, а затем проанализировать его с помощью VisualVM, Eclipse MAT или других анализаторов кучи.

Следовать за

Мне кажется странным, что вы перечисляете обещания. Нелогично запускать вычисление независимо как от перечислителя, так и от итерируемого. Решение на основе итерации может лучше обслуживаться перечислителем, который возвращает «инертные» элементы вместо обещаний. К сожалению, это сделало бы вашу обработку отдельных файлов последовательной, но это повторяется для вас - неблокирующая потоковая обработка.

ИМХО, решение на основе акторов подошло бы лучше, но как актеры, так и итерации (особенно последние) кажутся излишними для того, чего вы пытаетесь достичь (по крайней мере, части, которыми вы делитесь).

Пожалуйста, рассмотрите простые futures/promises из пакета Scala 2.10 scala.concurrent, а также обязательно взгляните на параллельные коллекции Scala. Я бы не стал вводить в код дополнительные концепции, пока они не окажутся недостаточными. Попробуйте определить ExecutionContext фиксированного размера для ограничения вашего параллелизма.

person nadavwr    schedule 26.04.2013
comment
Отличный совет. Я прохожу шаг за шагом, чтобы убедиться, что все выполняется так, как я предполагаю. Я обновил свой вопрос выше с помощью трассировки стека. Я собираюсь попробовать дамп кучи дальше. Спасибо! - person RJ Regenold; 26.04.2013
comment
Что касается вашего продолжения: я согласен с вашими опасениями по поводу использования Iteratee для этого процесса. Из того, что я написал, это определенно кажется излишним. Однако шаблон загрузки файла (или файлов), потоковой передачи содержимого, обработки каждой записи, а затем каких-либо действий с результатом используется повсеместно в нашем приложении. Я чувствую, что Iteratee дали мне несколько хороших повторно используемых фрагментов кода, которые я могу использовать для создания этих более крупных процессов. Большое спасибо за ваше время и помощь! - person RJ Regenold; 26.04.2013

Я начал ответ после быстрого прочтения, и каким-то образом у меня в голове застряло «переполнение стека» вместо «недостаточно памяти» ... Должен быть URL :-)

Тем не менее, функциональные вычисления, основанные на рекурсии, подвержены переполнению стека, поэтому я оставил ответ на месте для всех, кто споткнется, и обещаю попытаться найти более подходящий ответ.

Если бы вы столкнулись с переполнением стека, вам понадобился бы «трамплин», конструкция, которая ускоряет ваши вычисления из стека между рекурсиями.

См. раздел "Stackless Scala с бесплатными монадами" в Изучение Scalaz, день 18, часть превосходной работы @eed3si9n. серия постов.

См. также эту суть от @mpilquist, демонстрирующую батутную итерацию.

person nadavwr    schedule 26.04.2013
comment
Ха-ха, stackoverflow.com — неудачное название, когда вы говорите о долго работающих функциональных процессах. - person RJ Regenold; 26.04.2013