У меня есть PCollection [String], которая говорит «X», которую мне нужно сбросить в таблицу BigQuery. Назначение таблицы и схема для нее в коллекции PCollection [TableRow] говорят «Y». Как сделать это самым простым способом?
Я попытался извлечь таблицу и схему из «Y» и сохранить их в статических глобальных переменных (tableName и schema соответственно). Но как-то странно BigQueryIO.writeTableRows () всегда получает значение переменной tableName как null. Но он получает схему. Я пробовал регистрировать значения этих переменных и вижу, что значения есть для обоих.
Вот мой код конвейера:
static String tableName;
static TableSchema schema;
PCollection<String> read = p.apply("Read from input file",
TextIO.read().from(options.getInputFile()));
PCollection<TableRow> tableRows = p.apply(
BigQueryIO.read().fromQuery(NestedValueProvider.of(
options.getfilename(),
new SerializableFunction<String, String>() {
@Override
public String apply(String filename) {
return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'";
}
})).usingStandardSql().withoutValidation());
final PCollectionView<List<String>> dataView = read.apply(View.asList());
tableRows.apply("Convert data read from file to TableRow",
ParDo.of(new DoFn<TableRow,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) {
tableName = c.element().get("table").toString();
String[] schemas = c.element().get("schema").toString().split(",");
List<TableFieldSchema> fields = new ArrayList<>();
for(int i=0;i<schemas.length;i++) {
fields.add(new TableFieldSchema()
.setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
}
schema = new TableSchema().setFields(fields);
//My code to convert data to TableRow format.
}}).withSideInputs(dataView));
tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to("ProjectID:DatasetID."+tableName)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Все нормально работает. Только операция BigQueryIO.write не выполняется, и я получаю сообщение об ошибке TableId is null.
Я также пробовал использовать SerializableFunction и возвращать значение оттуда, но все равно получаю null.
Вот код, который я пробовал для этого:
tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
.withSchema(schema)
.to(new GetTable(tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
public static class GetTable implements SerializableFunction<String,String> {
String table;
public GetTable() {
this.table = tableName;
}
@Override
public String apply(String arg0) {
return "ProjectId:DatasetId."+table;
}
}
Я также пробовал использовать DynamicDestinations, но получаю сообщение об отсутствии схемы. Честно говоря, я новичок в концепции DynamicDestinations и не уверен, что делаю это правильно.
Вот код, который я пробовал для этого:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination getTable(TableRow dest) {
List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema
String table = list.get(0).get("table").toString();
String tableSpec = "ProjectId:DatasetId."+table;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
return null;
}
@Override
public TableSchema getSchema(TableRow destination) {
return schema; //schema is getting added from the global variable
}
@Override
public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
return null;
}
}.getSideInputs(bqDataView)));
Пожалуйста, дайте мне знать, что я делаю не так и какой путь мне следует выбрать.
Спасибо.