Как удалить искаженные строки при чтении csv с помощью схемы Spark?

Пока я использую Spark DataSet для загрузки CSV-файла. Я предпочитаю четкое обозначение схемы. Но я обнаружил, что есть несколько строк, не соответствующих моей схеме. Столбец должен быть двойным, но некоторые строки не являются числовыми значениями. Можно ли легко отфильтровать все строки, не соответствующие моей схеме, из DataSet?

val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")

ф.csv:

a
1.0

Я предпочитаю, чтобы «а» можно было легко отфильтровать из моего набора данных. Спасибо!


person HouZhe    schedule 09.04.2018    source источник


Ответы (2)


Если вы читаете файл CSV и хотите удалить строки, не соответствующие схеме. Вы можете сделать это, добавив опцию mode как DROPMALFORMED

Входные данные

a,1.0
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
f,3.1

Схема

val schema = StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", DoubleType, false)
))

Чтение файла csv с schema и option как

  val df = spark.read.schema(schema)
    .option("mode", "DROPMALFORMED")
    .csv("/path to csv file ")

Выход:

+-----+-----+
|key  |value|
+-----+-----+
|hello|1.0  |
|hi   |2.2  |
|how  |3.1  |
|you  |4.5  |
+-----+-----+

Подробнее о spark-csv можно узнать здесь.

Надеюсь это поможет!

person koiralo    schedule 09.04.2018
comment
Спасибо! Оба ваших ответа верны. Поэтому я установил первый ответ как принятый ответ. Но спасибо вам обоим за ваши ценные ответы! - person HouZhe; 10.04.2018
comment
Как восстановить поврежденные записи, если они нужны для аудита. Например, я хочу создать новый df с поврежденными записями. - person Abhi; 28.01.2019
comment
Точно, о искаженных записях следует сообщать исходной системе/поставщику, это супер основная функция, есть ли у нас такая функция в Spark? - person kensai; 29.04.2019
comment
Одно предостережение заключается в том, что вы молча удаляете данные из своих входных данных. - person Ricardo Mutti; 15.05.2020

.option("mode", "DROPMALFORMED") должен выполнить эту работу.

mode (по умолчанию PERMISSIVE): разрешает режим работы с поврежденными записями во время синтаксического анализа.

  • PERMISSIVE : устанавливает другие поля в null, когда встречается поврежденная запись, и помещает искаженную строку в новое поле, настроенное columnNameOfCorruptRecord. Когда схема устанавливается пользователем, она устанавливает null для дополнительных полей.

  • DROPMALFORMED : игнорирует все поврежденные записи.

  • FAILFAST : выдает исключение при обнаружении поврежденных записей.

person Amit Kulkarni    schedule 09.04.2018
comment
Разве это не тот же ответ, что и другой? - person koiralo; 09.04.2018
comment
Будет лучше, когда результат загрузчика CSV на самом деле будет двумя отдельными объектами фрейма данных, один из которых проанализирован, а второй поврежден. - person kensai; 29.04.2019