Ошибка в DynamicDestinations: Apache Beam

Я получаю сообщение об ошибке при выполнении кода ниже:

tableRows2.apply(BigQueryIO.writeTableRows()
          .to(new DynamicDestinations<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;
            @Override
                public TableDestination getTable(TableRow dest) {
                      List<TableRow> list = sideInput(bqDataView);
                      String table = list.get(0).get("table").toString();
                      String tableSpec = "ProjectId:DatasetId."+table;
                      String tableDescription = "";
                      return new TableDestination(tableSpec, tableDescription);
                }

                public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
                    String str = bqDataView.toString();
                    return str;
                }

                @Override
                public TableSchema getSchema(TableRow destination) {
                    List<TableRow> list = sideInput(bqDataView);
                    String[] schemas = list.get(0).get("schema").toString().split(",");
                    List<TableFieldSchema> fields = new ArrayList<>();
                    for(int i=0;i<schemas.length;i++)
                    {
                        fields.add(new TableFieldSchema().setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
                    }
                    TableSchema schema = new TableSchema().setFields(fields);
                    return schema;
                }

                @Override
                public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
                    return null;
                }
              }.getSideInputs(bqDataView)).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

Я получаю следующее сообщение об ошибке:

  (7dc1af5b557d4d6b): java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format: SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:403#5f2ef1f005ae0b4>}
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.parseTableSpec(BigQueryHelpers.java:102)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$TableSpecToTableRef.apply(BigQueryHelpers.java:286)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$TableSpecToTableRef.apply(BigQueryHelpers.java:282)
    at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
    at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
    at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
    at org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantTableDestinations.getDestination(DynamicDestinationsHelpers.java:64)
    at org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantTableDestinations.getDestination(DynamicDestinationsHelpers.java:41)
    at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:58)

Это правильный способ реализовать DynamicDestinations в Apache Beam? Кроме того, всегда ли необходимо возвращать String в методе getSideInputs ()? Что нужно написать в методе getDestination ()?

Спасибо.


person rish0097    schedule 24.07.2017    source источник


Ответы (1)


Я думаю, что вы неправильно передаете побочные данные DynamicDestinations. В частности, поскольку вы вызываете getSideInputs в вызове to(...), вы используете результат getSideInputs - по сути, следующий:

tableRows2.apply(BigQueryIO.writeTableRows()
      .to(bqDataView.toString())

Глядя на интерфейс для DynamicDestinations и тест на BigQueryIO, кажется, что вам следует сделать что-то вроде следующего:

final PCollectionView<List<TableRow>> bqDataView = /* ... */;
tableRows2.apply(BigQueryIO.writeTableRows()
    .to(new DynamicDestinations<TableRow, TableRow>() {

        // ...

        // Note this method needs to be overridden and use
        // the same signature as in DynamicDestinations. Also,
        // it should not be invoked as part of the apply.
        @Override
        public List<PCollectionView<?>> getSideInputs() {
          return ImmutableList.of(bqDataView);
        }

        // ...
      }).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

Кроме того, вместо использования DynamicDestinations<TableRow, TableRow>, второй тип должен быть чем-то, что вы можете сгруппировать, чтобы идентифицировать таблицу, в которую выполняется запись. В вашем случае кажется, что все идет к одному и тому же фактическому месту назначения, поэтому вы можете просто использовать String с фиксированным значением:

new DynamicDestinations<TableRow, String>() {
  @Override
  public TableDestination getTable(String unusedDest) {
    // ...
  }

  @Override
  public TableSchema getSchema(String unusedDest) {
    ...
  }

  @Override
  public String getDestination(ValueInSingleWindow<TableRow> element) {
    return "destination";
  }
}
person Ben Chambers    schedule 24.07.2017