У меня есть список файлов. Я хочу:
- Читать их все как единый Источник.
- Файлы следует читать последовательно, по порядку. (без циклического перебора)
- Ни в коем случае не требуется, чтобы какой-либо файл полностью находился в памяти.
- При чтении ошибки из файла поток должен быть свернут.
Мне казалось, что это должно работать: (Scala, akka-streams v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Но это приводит к ошибке компиляции, поскольку с FileIO
связано материализованное значение, а Source.combine
этого не поддерживает.
Отображение материализованного значения заставляет меня задаться вопросом, как обрабатываются ошибки чтения файла, но компилируется:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Но выбрасывает исключение IllegalArgumentException во время выполнения:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]