Глядя на код в вашей сути, я думаю, что строка с комментарием - это именно та строка, которую вы вообще не хотите:
object CSVIOInterpreter extends (CSVIO ~> Future) {
import scala.collection.JavaConverters._
override def apply[A](fa: CSVIO[A]): Future[A] = fa match {
case ReadCSV(reader) => Future.fromTry(Try {
CSVFormat.RFC4180
.withFirstRecordAsHeader()
.parse(reader)
.getRecords // Loads the complete file
.iterator().asScala.toStream
})
}
}
Просто удалите всю строку getRecords
. CSVFormat.parse
возвращает экземпляр CSVParser, который уже реализует Iterable<CSVRecord>
. И вызов getRecords
— единственное, что заставляет его читать весь файл.
На самом деле вы можете увидеть CSVParser.getRecords, и это
public List<CSVRecord> getRecords() throws IOException {
CSVRecord rec;
final List<CSVRecord> records = new ArrayList<>();
while ((rec = this.nextRecord()) != null) {
records.add(rec);
}
return records;
}
Таким образом, он просто материализует весь файл, используя вызов this.nextRecord
, который, очевидно, является более «основной» частью API.
Поэтому, когда я делаю упрощенную версию вашего кода без вызова getRecords
:
import cats._
import cats.free.Free
import java.io._
import org.apache.commons.csv._
import scala.collection.JavaConverters._
trait Action[A] {
def run(): A
}
object F {
import Free.liftF
case class GetReader(fileName: String) extends Action[Reader] {
override def run(): Reader = new FileReader(fileName)
}
case class CloseReader(reader: Reader) extends Action[Unit] {
override def run(): Unit = reader.close()
}
def getReader(fileName: String): Free[Action, Reader] = liftF(GetReader(fileName))
def close(reader: Reader): Free[Action, Unit] = liftF(CloseReader(reader))
}
object C {
import Free.liftF
case class ReadCSV(reader: Reader) extends Action[CSVParser] {
override def run(): CSVParser = CSVFormat.DEFAULT.parse(reader)
}
def readCSV(reader: Reader): Free[Action, CSVParser] = liftF(ReadCSV(reader))
}
object I {
import Free.liftF
case class ProcessCSV(parser: CSVParser) extends Action[Unit] {
override def run(): Unit = {
for (r <- parser.asScala)
println(r)
}
}
def processCSV(parser: CSVParser): Free[Action, Unit] = liftF(ProcessCSV(parser))
}
object Runner {
import cats.arrow.FunctionK
import cats.{Id, ~>}
val runner = new (Action ~> Id) {
def apply[A](fa: Action[A]): Id[A] = fa.run()
}
def run[A](free: Free[Action, A]): A = {
free.foldMap(runner)
}
}
def test() = {
val free = for {
// reader <- F.getReader("my_file.csv")
reader <- F.getReader("AssetsImportCompleteSample.csv")
csvRecords <- C.readCSV(reader)
_ <- I.processCSV(csvRecords)
_ <- F.close(reader)
} yield ()
Runner.run(free)
}
вроде работает нормально в построчном режиме.
person
SergGr
schedule
31.12.2017
readCSV
иprocessCSV
?CSVParser
уже предоставляетIterable
/Iterator
API - person SergGr   schedule 31.12.2017hasNext
/next
Iterable/Iterator. Возможно, с помощью потоков. В приведенной выше текущей реализации поток создается из списка, и список содержит все записи в файле, загруженном в память. - person arjunswaj   schedule 31.12.2017Stream
лучше, чемIterable
, учитывая тот факт, чтоscala.collection.immutable.Stream
реализуетscala.collection.Iterable
через своих родителейAbstractSeq
иAbstractIterable
? Также обратите внимание, чтоmap
определяется поверхTraversableLike
, который является еще более базовым родителемIterable
. - person SergGr   schedule 31.12.2017Stream
обеспечивает ленивую реализацию, а язык/sdk обрабатывает итерацию. Я хотел избежатьwhile(parser.hasNext) process(parser.next)
и придерживаться конструкций FP, таких какmap
/flatmap
. Я вижу, что вы использовалиparser.asScala
, это должно сработать. - person arjunswaj   schedule 31.12.2017Iterable
на самом деле такой же ленивый, какStream
. AFAIU, единственное реальное преимуществоStream
заключается в том, что, в отличие отIterable
, он может быть построен с самоссылкой, например, в типичном примере последовательности Фибоначчи, и при этом быть ленивым. - person SergGr   schedule 31.12.2017