Файловый ввод-вывод с бесплатными монадами

У меня есть файл CSV, который мне нужно проанализировать и выполнить какое-то действие с каждой записью. Как мне использовать Free Monads с ним? В настоящее время я загружаю весь файл в память и хотел бы знать, есть ли лучшее решение. Ниже моя программа:

for {
    reader <- F.getReader("my_file.csv")
    csvRecords <- C.readCSV(reader)
    _ <- I.processCSV(csvRecords)
    _ <- F.close(reader)
} yield()

Этот код работает для небольших файлов, но если у меня есть очень большие файлы (более 1 ГБ), это не очень хорошо сработает. Я использую Commons CSV для чтения CSVRecords.


person arjunswaj    schedule 30.12.2017    source источник
comment
arjunswaj, что вам действительно нужно: метод, использующий Free Monad, или любое решение, которое работает и не загружает в память весь файл?   -  person SergGr    schedule 31.12.2017
comment
@SergGr Мне нужен метод, который использует Free Monad и не загружает весь файл в память.   -  person arjunswaj    schedule 31.12.2017
comment
Хорошо, свободная монада обязательна. Не могли бы вы показать нам свои текущие readCSV и processCSV? CSVParser уже предоставляет Iterable/Iterator API   -  person SergGr    schedule 31.12.2017
comment
@SergGr, вот суть программы, которая у меня есть.   -  person arjunswaj    schedule 31.12.2017
comment
Я хочу сделать это более FP-способом - без использования цикла while с hasNext/next Iterable/Iterator. Возможно, с помощью потоков. В приведенной выше текущей реализации поток создается из списка, и список содержит все записи в файле, загруженном в память.   -  person arjunswaj    schedule 31.12.2017
comment
Как вы думаете, чем Stream лучше, чем Iterable, учитывая тот факт, что scala.collection.immutable.Stream реализует scala.collection.Iterable через своих родителей AbstractSeq и AbstractIterable? Также обратите внимание, что map определяется поверх TraversableLike, который является еще более базовым родителем Iterable.   -  person SergGr    schedule 31.12.2017
comment
Stream обеспечивает ленивую реализацию, а язык/sdk обрабатывает итерацию. Я хотел избежать while(parser.hasNext) process(parser.next) и придерживаться конструкций FP, таких как map/flatmap. Я вижу, что вы использовали parser.asScala, это должно сработать.   -  person arjunswaj    schedule 31.12.2017
comment
Raw Iterable на самом деле такой же ленивый, как Stream. AFAIU, единственное реальное преимущество Stream заключается в том, что, в отличие от Iterable, он может быть построен с самоссылкой, например, в типичном примере последовательности Фибоначчи, и при этом быть ленивым.   -  person SergGr    schedule 31.12.2017


Ответы (2)


Глядя на код в вашей сути, я думаю, что строка с комментарием - это именно та строка, которую вы вообще не хотите:

  object CSVIOInterpreter extends (CSVIO ~> Future) {
    import scala.collection.JavaConverters._
    override def apply[A](fa: CSVIO[A]): Future[A] = fa match {
      case ReadCSV(reader) => Future.fromTry(Try {
        CSVFormat.RFC4180
          .withFirstRecordAsHeader()
          .parse(reader)
          .getRecords // Loads the complete file
          .iterator().asScala.toStream
      })
    }
  }

Просто удалите всю строку getRecords. CSVFormat.parse возвращает экземпляр CSVParser, который уже реализует Iterable<CSVRecord>. И вызов getRecords — единственное, что заставляет его читать весь файл.

На самом деле вы можете увидеть CSVParser.getRecords, и это

 public List<CSVRecord> getRecords() throws IOException {
     CSVRecord rec;
     final List<CSVRecord> records = new ArrayList<>();
     while ((rec = this.nextRecord()) != null) {
         records.add(rec);
     }
     return records;
 }

Таким образом, он просто материализует весь файл, используя вызов this.nextRecord, который, очевидно, является более «основной» частью API.

Поэтому, когда я делаю упрощенную версию вашего кода без вызова getRecords:

import cats._
import cats.free.Free
import java.io._
import org.apache.commons.csv._
import scala.collection.JavaConverters._

trait Action[A] {
  def run(): A
}

object F {

  import Free.liftF

  case class GetReader(fileName: String) extends Action[Reader] {
    override def run(): Reader = new FileReader(fileName)
  }

  case class CloseReader(reader: Reader) extends Action[Unit] {
    override def run(): Unit = reader.close()
  }

  def getReader(fileName: String): Free[Action, Reader] = liftF(GetReader(fileName))

  def close(reader: Reader): Free[Action, Unit] = liftF(CloseReader(reader))
}

object C {

  import Free.liftF

  case class ReadCSV(reader: Reader) extends Action[CSVParser] {
    override def run(): CSVParser = CSVFormat.DEFAULT.parse(reader)
  }

  def readCSV(reader: Reader): Free[Action, CSVParser] = liftF(ReadCSV(reader))
}

object I {

  import Free.liftF

  case class ProcessCSV(parser: CSVParser) extends Action[Unit] {
    override def run(): Unit = {
      for (r <- parser.asScala)
        println(r)
    }
  }

  def processCSV(parser: CSVParser): Free[Action, Unit] = liftF(ProcessCSV(parser))

}

object Runner {

  import cats.arrow.FunctionK
  import cats.{Id, ~>}

  val runner = new (Action ~> Id) {
    def apply[A](fa: Action[A]): Id[A] = fa.run()
  }

  def run[A](free: Free[Action, A]): A = {
    free.foldMap(runner)
  }
}


def test() = {
  val free = for {
    //        reader <- F.getReader("my_file.csv")
    reader <- F.getReader("AssetsImportCompleteSample.csv")
    csvRecords <- C.readCSV(reader)
    _ <- I.processCSV(csvRecords)
    _ <- F.close(reader)
  } yield ()

  Runner.run(free)
}

вроде работает нормально в построчном режиме.

person SergGr    schedule 31.12.2017
comment
Хорошая реализация! - person Yuval Itzchakov; 31.12.2017

Вот как я использую файл CSV для чтения и выполнения некоторых операций с ним - я использую scala.io.Source.fromFile()

Я создаю один case class типа header файла CSV, чтобы сделать данные более доступными и оперативными.

PS: У меня нет знаний монад, так как я новичок в Scala. Я разместил это, так как это может быть полезно.

case class AirportData(id:Int, ident:String, name:String, typeAirport:String, latitude_deg:Double,
longitude_deg:Double, elevation_ft:Double, continent:String, iso_country:String, iso_region:String,
municipality:String)

object AirportData extends App {

def toDoubleOrNeg(s: String): Double = {
  try {
    s.toDouble
   } catch {
    case _: NumberFormatException => -1 
   }
 }

val source = scala.io.Source.fromFile("resources/airportData/airports.csv")
val lines = source.getLines().drop(1)
val data = lines.flatMap { line =>
val p = line.split(",")
  Seq(AirportData(p(0).toInt, p(1).toString, p(2).toString, p(3).toString, toDoubleOrNeg(p(4)), toDoubleOrNeg(p(5)), 
      toDoubleOrNeg(p(6)), p(7).toString, p(8).toString, p(9).toString, p(10).toString))
 }.toArray   
 source.close()
 println(data.length)
 data.take(10) foreach println
}
person royki    schedule 30.12.2017