Как в Spark преобразовать несколько фреймов данных в avro?

У меня есть задание Spark, которое обрабатывает некоторые данные в несколько отдельных фреймов данных. Я храню эти фреймы данных в списке, то есть фреймы данных []. В конце концов, я хотел бы объединить эти фреймы данных в иерархический формат и записать результат в avro. Схема avro выглядит примерно так:

{
    "name": "mydata",
    "type": "record",
    "fields": [
        {"name": "data", "type": {
            "type": "array", "items": {
                "name": "actualData", "type": "record", "fields": [
                    {"name": "metadata1", "type": "int"},
                    {"name": "metadata2", "type": "string"},
                    {"name": "dataframe", "type": {
                        "type": "array", "items": {
                            "name": "dataframeRecord", "type": "record", "fields": [
                                {"name": "field1", "type": "int"},
                                {"name": "field2", "type": "int"},
                                {"name": "field3", "type": ["string", "null"]}]
                            }
                        }
                    }]
                }
            }
        }
    ]
}

Как можно предположить, каждый фрейм данных имеет три поля: field1, field2 и field3, которые я хотел бы записать в виде массива в файле avro. Также с каждым фреймом данных связаны некоторые метаданные.

Мой текущий подход заключается в том, чтобы после обработки этих данных записать фреймы данных в S3, а затем использовать отдельную программу для извлечения этих данных из S3, использовать библиотеку avro для записи файла avro, а затем снова загрузить его в S3.

Однако по мере роста объема данных это становится очень медленно. Я заглянул в библиотеку databricks, чтобы напрямую писать файлы avro, но я не знаю, как я могу объединить фреймы данных вместе в памяти или как библиотека databricks может определить схему, которую я использую.

Есть ли идиоматический способ сделать это в Spark?

P.S. Я использую EMR со Spark 2.0.0 на Python.


person James    schedule 01.06.2017    source источник


Ответы (2)


Если схема такая же, и вы просто хотите поместить все записи в один и тот же DataFrame, вы можете использовать метод DataFrame unionAll.

http://spark.apache.org/docs/1.6.3/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionAll

Эта функция возьмет один фрейм данных и добавит его к другому. Уловка заключается в том, что предполагается, что столбцы расположены в одном и том же порядке между ними, поэтому вам может потребоваться некоторая работа, чтобы выровнять их и создать пустые столбцы для тех, которые отсутствуют. Вот функция Python, которую я использую для безопасного объединения нескольких фреймов данных.

def union_multiple_dataframes(iterable_list_df):
    input_dfs = list(iterable_list_df)

    # First figure out all the field names
    field_types = {}
    for df in input_dfs:
        for field in df.schema.fields:
            # Check for type mismatch
            if field in field_types:
                if field.dataType != field_types[field.name]:
                    raise ValueError("Mismatched data types when unioning dataframes for field: {}".format(field))
            else:
                field_types[field.name] = field.dataType

    # First add in empty fields so all df's have the same schema
    fields = set(field_types.keys())
    for i, df in enumerate(input_dfs):
        missing = fields - set(df.schema.names)
        for field in missing:
            df = df.withColumn(field, F.lit(None))

        input_dfs[i] = df

    # Finally put all the df's columns in the same order, and do the actual union
    sorted_dfs = [df.select(*sorted(fields)) for df in iterable_list_df]
    return reduce(lambda x, y: x.unionAll(y), sorted_dfs)

Пример использования будет примерно таким:

input_dfs = [do_something(..) for x in y]
combined_df = union_multiple_dataframes(input_dfs)
combined_df.write.format("com.databricks.spark.avro").save("s3://my-bucket/path")
person Ryan Widmaier    schedule 01.06.2017
comment
Спасибо, что нашли время ответить. Мои фреймы данных соответствуют одному и тому же формату, но мне нужно обернуть каждый фрейм данных некоторыми метаданными перед их объединением. В моем вопросе я указал схему, которую использую. В этой схеме данные, содержащиеся в моих фреймах данных, находятся только в области действия раздела dataframeRecord. Как я могу добавить поля метаданных в мои фреймы данных перед их объединением? - person James; 03.06.2017
comment
Одинаковы ли метаданные для каждой строки входного DF? Можете ли вы просто прикрепить метаданные к каждому DF перед объединением? - person Ryan Widmaier; 05.06.2017
comment
Они не то же самое, но с небольшим количеством кода я уверен, что смогу обработать данные перед объединением. Однако как я могу прикрепить метаданные к фреймворку данных? - person James; 06.06.2017
comment
Не могли бы вы подробнее рассказать, как вы генерируете свои метаданные? Если он основан на данных вашей строки, вы можете просто использовать withColumn во входных DF. В противном случае, возможно, вам подойдет соединение фрейма данных. Трудно узнать, не понимая, что вы пытаетесь сделать. - person Ryan Widmaier; 06.06.2017
comment
Я придумал решение, хотя и немного хакерское. Выложу ниже. - person James; 12.06.2017

Я нашел решение, специфичное для PySpark:

С каждым фреймом данных я использовал .collect (), чтобы получить список строк. Для каждого объекта Row я вызвал asDict (), чтобы получить словарь. Оттуда я смог составить список словарей с помощью простого цикла. Как только у меня есть этот список словарей, данные покидают Spark и попадают на территорию чистого Python, и их «легче» обрабатывать (но менее эффективно).

В качестве альтернативы, если бы я выбрал Scala вместо Python, я мог бы преобразовать фрейм данных в набор данных, который, кажется, предоставляет несколько методов для выполнения необходимых мне операций, но это совсем другая история.

person James    schedule 12.06.2017