Пункты назначения в Apache Beam

У меня есть PCollection [String], которая говорит «X», которую мне нужно сбросить в таблицу BigQuery. Назначение таблицы и схема для нее в коллекции PCollection [TableRow] говорят «Y». Как сделать это самым простым способом?

Я попытался извлечь таблицу и схему из «Y» и сохранить их в статических глобальных переменных (tableName и schema соответственно). Но как-то странно BigQueryIO.writeTableRows () всегда получает значение переменной tableName как null. Но он получает схему. Я пробовал регистрировать значения этих переменных и вижу, что значения есть для обоих.

Вот мой код конвейера:

static String tableName;
static TableSchema schema;

PCollection<String> read = p.apply("Read from input file",
  TextIO.read().from(options.getInputFile()));

PCollection<TableRow> tableRows = p.apply(
  BigQueryIO.read().fromQuery(NestedValueProvider.of(
    options.getfilename(),
    new SerializableFunction<String, String>() {
         @Override
         public String apply(String filename) {
           return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'";
         }
    })).usingStandardSql().withoutValidation());

final PCollectionView<List<String>> dataView = read.apply(View.asList());

tableRows.apply("Convert data read from file to TableRow",
  ParDo.of(new DoFn<TableRow,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
      tableName = c.element().get("table").toString();
      String[] schemas = c.element().get("schema").toString().split(",");
      List<TableFieldSchema> fields = new ArrayList<>();
      for(int i=0;i<schemas.length;i++) {
        fields.add(new TableFieldSchema()
          .setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
      }
      schema = new TableSchema().setFields(fields);

      //My code to convert data to TableRow format.
    }}).withSideInputs(dataView)); 


tableRows.apply("write to BigQuery", 
  BigQueryIO.writeTableRows()
    .withSchema(schema)
    .to("ProjectID:DatasetID."+tableName)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Все нормально работает. Только операция BigQueryIO.write не выполняется, и я получаю сообщение об ошибке TableId is null.

Я также пробовал использовать SerializableFunction и возвращать значение оттуда, но все равно получаю null.

Вот код, который я пробовал для этого:

tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
  .withSchema(schema)
  .to(new GetTable(tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

public static class GetTable implements SerializableFunction<String,String> {
  String table;

  public GetTable() {
    this.table = tableName;
  }

  @Override
  public String apply(String arg0) {
    return "ProjectId:DatasetId."+table;
  }
}

Я также пробовал использовать DynamicDestinations, но получаю сообщение об отсутствии схемы. Честно говоря, я новичок в концепции DynamicDestinations и не уверен, что делаю это правильно.

Вот код, который я пробовал для этого:

tableRows2.apply(BigQueryIO.writeTableRows()
  .to(new DynamicDestinations<TableRow, TableRow>() {
    private static final long serialVersionUID = 1L;
    @Override
    public TableDestination getTable(TableRow dest) {
      List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema
      String table = list.get(0).get("table").toString();
      String tableSpec = "ProjectId:DatasetId."+table;
      String tableDescription = "";
      return new TableDestination(tableSpec, tableDescription);
    }

    public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
      return null;
    }

    @Override
    public TableSchema getSchema(TableRow destination) {
      return schema;   //schema is getting added from the global variable
    }
    @Override
    public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
      return null;
    }
}.getSideInputs(bqDataView)));

Пожалуйста, дайте мне знать, что я делаю не так и какой путь мне следует выбрать.

Спасибо.


person rish0097    schedule 21.07.2017    source источник


Ответы (1)


Частично у вас проблемы из-за двух этапов выполнения конвейера. Сначала на вашей машине создается конвейер. Это когда происходят все применения PTransforms. В вашем первом примере это когда выполняются следующие строки:

BigQueryIO.writeTableRows()
  .withSchema(schema)
  .to("ProjectID:DatasetID."+tableName)

Однако код внутри ParDo запускается, когда выполняется ваш конвейер, и это происходит на многих машинах. Таким образом, следующий код выполняется намного позже, чем создание конвейера:

@ProcessElement
public void processElement(ProcessContext c) {
  tableName = c.element().get("table").toString();
  ...
  schema = new TableSchema().setFields(fields);
  ...
}

Это означает, что ни для полей tableName, ни для полей схемы не будет установлено значение при создании приемника BigQueryIO.

Ваша идея использовать DynamicDestinations верна, но вам нужно переместить код, чтобы фактически сгенерировать схему назначения в этот класс, а не полагаться на глобальные переменные, которые доступны не на всех машинах.

person Ben Chambers    schedule 21.07.2017
comment
хорошо ... но это то, что я сказал, что, как ни странно, программа может получить схему во время операции bigqueryIO.write, но не может получить имя таблицы ... - person rish0097; 22.07.2017
comment
Вы уверены, что схема была настроена правильно? Глядя на код, кажется, что схема будет иметь значение null, и на самом деле ничего не произойдет, пока конвейер не будет запущен? Как уже упоминалось, следует ожидать, что этот путь завершится ошибкой, поэтому, даже если он не удастся выполнить иначе, чем ожидалось, использование подхода DynamicDestinations - это путь вперед. - person Ben Chambers; 24.07.2017