Потоки Akka: чтение нескольких файлов

У меня есть список файлов. Я хочу:

  1. Читать их все как единый Источник.
  2. Файлы следует читать последовательно, по порядку. (без циклического перебора)
  3. Ни в коем случае не требуется, чтобы какой-либо файл полностью находился в памяти.
  4. При чтении ошибки из файла поток должен быть свернут.

Мне казалось, что это должно работать: (Scala, akka-streams v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines

Но это приводит к ошибке компиляции, поскольку с FileIO связано материализованное значение, а Source.combine этого не поддерживает.

Отображение материализованного значения заставляет меня задаться вопросом, как обрабатываются ошибки чтения файла, но компилируется:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
    .mapMaterializedValue(f => NotUsed.getInstance())
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))  // counting lines

Но выбрасывает исключение IllegalArgumentException во время выполнения:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]

person randomstatistic    schedule 13.06.2016    source источник


Ответы (3)


Приведенный ниже код не так краток, как мог бы быть, чтобы четко разделить различные проблемы.

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)

// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)

// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println
person Viktor Klang    schedule 14.06.2016
comment
Я искал модульный, поэтому ценю это. Я использовал счетчик строк как пример того, что я мог бы сделать с файлами, и lineCounter, как написано, объединяет это с чтением файла. (Это раковина) Но если я переместу складку и все, что после нее в другое место, у меня останется Flow [Path, String, NotUsed], который является именно той частью, которую я искал. - person randomstatistic; 14.06.2016
comment
Не могли бы вы предоставить импорт со своими примерами, они являются важной частью кода. - person ; 08.12.2017
comment
@OsskarWerrewka Все должно быть в akka.stream.scaladsl и java IO / NIO. У вас были проблемы с этим? - person Viktor Klang; 15.12.2017
comment
Наконец-то я это сделал. Спасибо. - person ; 16.12.2017

Обновить. Я не увидел принятого ответа, потому что не обновил страницу> _ ‹. Я все равно оставлю это здесь, так как я также добавил несколько примечаний об обработке ошибок.

Я считаю, что следующая программа делает то, что вы хотите:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import java.nio.file.Paths
import scala.concurrent.duration._

object TestMain extends App {
  implicit val actorSystem = ActorSystem("test")
  implicit val materializer = ActorMaterializer()
  implicit def ec = actorSystem.dispatcher

  val sources = Vector("build.sbt", ".gitignore")
    .map(Paths.get(_))
    .map(p =>
      FileIO.fromPath(p)
        .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
        .mapMaterializedValue { f =>
          f.onComplete {
            case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
            case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
            case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
          }
          NotUsed
        }
    )
  val finalSource = Source(sources).flatMapConcat(identity)

  val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
  result.onComplete {
    case Success(n) => println(s"Read $n lines total")
    case Failure(e) => println(s"Reading failed: $e")
  }
  Await.ready(result, 10.seconds)

  actorSystem.terminate()
}

Ключевым моментом здесь является _ 2_: он преобразует каждый элемент потока в источник и возвращает поток элементов, полученных из этих источников, если они выполняются последовательно.

Что касается обработки ошибок, вы можете либо добавить обработчик future в аргументе mapMaterializedValue, либо обработать последнюю ошибку текущего потока, поместив обработчик на Sink.foreach материализованное будущее значение. Я сделал и то, и другое в приведенном выше примере, и если вы протестируете это, скажем, на несуществующем файле, вы увидите, что одно и то же сообщение об ошибке будет напечатано дважды. К сожалению, flatMapConcat() не собирает материализованные значения, и, честно говоря, я не вижу, как он мог бы это сделать разумно, поэтому при необходимости вы должны обрабатывать их отдельно.

person Vladimir Matveev    schedule 15.06.2016

У меня есть один неожиданный ответ - не используйте akka.FileIO. Кажется, это нормально работает, например:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _)
val source = Source.fromIterator[String](() => sources)
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))

Я все еще хотел бы знать, есть ли лучшее решение.

person randomstatistic    schedule 13.06.2016
comment
Используя io.Source, вы теряете много энергии. Для небольших файлов это может сработать, но для больших файлов это невозможно. - person jarandaf; 06.07.2016
comment
@jarandaf Можете уточнить? У меня создалось впечатление, что io.Source просто использовал BufferedReader под капотом, а итератор getLines не загружает сразу весь файл или что-то в этом роде. - person randomstatistic; 06.07.2016
comment
подумайте лучше, возможно, вы правы (хотя FileIO обрабатывает ByteString вместо String, что должно быть более производительным). С другой стороны, с io.Source всегда нужно помнить о закрытии источника (что не делается по умолчанию). - person jarandaf; 15.07.2016
comment
sources имя выше неверно, не так ли? До .reduce(_ ++ _) это источники, но после этого (фактическое значение) - один Iterator[String], представляющий все уже объединенные файлы. Просто проблема с наименованием - код понравился! - person akauppi; 29.09.2016
comment
Да, я это видел. Я имел в виду источники в семантическом смысле, содержащие множество источников, но вы правы в том, что это одна переменная Iterator, а не список, что может сбивать с толку. - person randomstatistic; 05.10.2016