Получено сообщение больше чем max в конвейере пакетной обработки

Я получаю это сообщение в конвейере пакетной обработки, который ежедневно работает в облачной службе потока данных Google. Он начал терпеть неудачу со следующим сообщением:

(88b342a0e3852af3): java.io.IOException: INVALID_ARGUMENT: Received message larger than max (21824326 vs. 4194304) 
dataflow-batch-jetty-11171129-7ea5-harness-waia talking to localhost:12346 at
com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.close(Native Method) at 
com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.close(ChunkingShuffleEntryWriter.java:67) at 
com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:286) at 
com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at 
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:264) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:197) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:149) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread.java:745)

Я все еще использую старый обходной путь для вывода файла CSV с такими заголовками, как

PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
    String new_line = System.getProperty("line.separator");
    String csv_header = "id, stuff_1, stuff_2" + new_line;
    StringBuilder csv_body = new StringBuilder().append(csv_header);

    @Override
    public void processElement(ProcessContext c) {
        csv_body.append(c.element()).append(newline);
    }

    @Override
    public void finishBundle(Context c) throws Exception {
        c.output(csv_body.toString());
    }

})).apply(TextIO.Write.named("WriteData").to(options.getOutput()));

Чем это вызвано? Не слишком ли велик выход этого DoFn? Размер обрабатываемого набора данных не увеличился.


person Caio Iglesias    schedule 17.11.2016    source источник


Ответы (1)


Похоже, это может быть ошибка с нашей стороны, и мы изучаем ее, но в целом код, вероятно, не выполняет то, что вы намереваетесь делать.

Как написано, вы получите неопределенное количество выходных файлов, имена которых начинаются с заданного префикса, каждый файл содержит конкатенацию ожидаемого CSV-подобного вывода (включая заголовки) для разных фрагментов данных в неопределенном порядок.

Чтобы правильно реализовать запись в файлы CSV, просто используйте TextIO.Write.withHeader() для указания заголовка и полностью удалите ParDo, создающий CSV. Это также не вызовет ошибку.

person jkff    schedule 17.11.2016
comment
Он работает на Google Cloud Dataflow Java SDK 1.5.0. Мне также известно о новом TextIO.Write.withHeader(), но я еще не обновил SDK или код. Мне просто было интересно, почему он начал давать сбой сейчас. - person Caio Iglesias; 18.11.2016
comment
Он успешно работал с SDK версий 1.7.0 и 1.8.0. Теперь, когда я смотрю на код, который уже содержит 1.7.0 в файле pom.xml. И я не уверен, будет ли он работать на 1.5.0, поскольку я уже обновил код для запуска с использованием нового API хранилища данных v1. - person Caio Iglesias; 18.11.2016
comment
О том, почему он начал давать сбои сейчас: мы постепенно улучшаем производительность в случайном порядке, и ваш конвейер обнаружил ошибку в улучшенной версии. Мы работаем над исправлением. Итак, я запутался: полностью ли решена ваша проблема с помощью SDK 1.7.0 / 1.8.0 и TextIO.Write.withHeader ()? - person jkff; 18.11.2016
comment
Решилось переключением только на 1.7.0 / 1.8.0. Я еще не обновил код для использования withHeader. - person Caio Iglesias; 18.11.2016
comment
Скорее всего, она на самом деле не решена, просто ваш конвейер не был выбран случайным образом для использования ошибочной оптимизации на этот раз. - person jkff; 18.11.2016
comment
хммм ... запланированный запуск этого задания из appengine flex длился час вместо обычных 5 минут, и мне пришлось его убить. идентификатор вакансии 2016-11-18_11_29_14-9043658950083337798. - person Caio Iglesias; 18.11.2016
comment
Я изучаю предыдущие запуски этой работы, и похоже, что версия SDK постоянно менялась. 1 ноября ir последний раз запускал SDK 1.7, затем он перешел на 1.6 со 2 по 15 ноября. Затем на следующий день он упал до 1,5 и не смог передать полученное сообщение. - person Caio Iglesias; 18.11.2016
comment
Я посмотрел на застрявшую работу. Похоже, у вас есть несколько версий SDK одновременно в вашем пути к классам (1.5.0, 1.5.1, 1.6.0, 1.7.0, 1.8.0), и это вызывает всевозможные проблемы, потому что порядок пути к классам недетерминированный. Убедитесь, что у вас есть только 1 версия за раз. - person jkff; 22.11.2016
comment
Хмм .. Так вот что заставило его работать на другой версии. Я это проверю. - person Caio Iglesias; 22.11.2016
comment
Вы были правы, и моя проблема была решена. Часть этого была связана с maven (pom.xml). - person Caio Iglesias; 22.11.2016