Программное создание схемы BigQuery в конвейере Beam

У меня есть коллекция однородных диктов, как мне их записать в BigQuery, не зная схемы?

BigQuerySink требует, чтобы я указывал схему при ее создании. Но я не знаю схемы: она определяется ключами диктов, которые я пытаюсь написать.

Есть ли способ, чтобы мой конвейер вывел схему, а затем предоставил ее обратно (как дополнительный ввод?) в приемник?

Например:

# Create a PCollection of dicts, something like
# {'field1': 'myval', 'field2': 10}
data = (p | 'generate_data' >> beam.ParDo(CreateData())

# Infer the schema from the data
# Generates a string for each element (ok to assume all dict keys equal)
# "field1:STRING, field2:INTEGER"
schema = (data
  | 'infer_schema' >> beam.ParDo(InferSchema())
  | 'sample_one'   >> beam.combiners.Sample.FixedSizeGlobally(1))

Но тогда как передать схему в качестве параметра BigQuerySink и использовать ее в beam.io.Write?

Я знаю, что это неправильно, но я хочу сделать следующее:

sink = BigQuerySink(tablename, dataset, project, schema=Materialize(schema))
p | 'write_bigquery' >> beam.io.Write(sink)

tl; dr Есть ли способ создать и написать таблицу больших запросов из луча apache программным путем, выводя схему из данных?


person Greg    schedule 30.06.2017    source источник
comment
API BigQuery имеет функцию автоматического определения схемы. Если это не поддерживается в Beam, стоит подать жалобу на Beam SDK.   -  person Tim Swast    schedule 30.06.2017
comment
Документы для автоматического определения схемы находятся по адресу cloud.google.com/bigquery/docs/schema-detect< /а>   -  person Tim Swast    schedule 30.06.2017
comment
Спасибо, Тим. Unf, в настоящее время API отклоняет приемник без схемы, если таблица еще не существует. Я посмотрю, смогу ли я выяснить, где подать запрос на добавление функций в Beam.   -  person Greg    schedule 30.06.2017


Ответы (2)


Предполагая, что ваша схема может часто меняться, для вас может быть лучше хранить данные в более общей форме.

Например, ваша строка может состоять из одной повторяющейся записи (ваших словарных статей).

Схема записи выглядит так: key (STRING) | необязательный string_val (STRING) | необязательный int_val (INTEGER) необязательный double_val (DOUBLE) | необязательный boolean_val (BOOLEAN) | ...

Затем вы можете написать запросы, которые сканируют ваши записи по типу. Это несколько менее эффективно (поскольку вам придется сканировать строки, которые в противном случае вы могли бы пропустить, если бы они находились в разных столбцах), но полностью позволяет избежать указания вашей схемы заранее.

person Adam Lydick    schedule 06.07.2017

На данный момент лучшим решением, которое я придумал, является явное жесткое кодирование сопоставления ключей dict со схемой BigQuery. Два преимущества: он работает с проблемой обязательного указания схемы и позволяет мне отфильтровывать элементы из dict, которые мне не нужны в BigQuery.

SCHEMA = {
  'field1': 'INTEGER',
  'field2': 'STRING',
  ...
}
schema_str = ','.join(['%s:%s' % (k, v) for k,v in SCHEMA.iteritems()])

sink = BigQuerySink(tablename,
        dataset=dataset,
        project=project,
        schema=schema_str,
        write_disposition=BigQueryDisposition.WRITE_TRUNCATE)

(pipeline
  # filters just the keys of each dict to the keys of SCHEMA.
  | 'filter_fields' >> beam.ParDo(FilterFieldKeysDoFn(SCHEMA))
  | 'to_bigquery' >> beam.io.Write(sink))
person Greg    schedule 07.07.2017