Почему Dataflow не работает при доступе к хранилищу данных?

Мой простой конвейер потока данных в большинстве случаев успешно копирует несколько видов из хранилища данных одного проекта в другое. Но в некоторых видах (около 5% из них) мы всегда получаем эти ошибки.

Поток данных повторяется 4-8 раз с задержкой около 75 секунд, после чего конвейер дает сбой.

Как я могу диагностировать и решить это?

РЕДАКТИРОВАТЬ: ответ включает в себя: (1) в библиотеке хранилища данных, используемой Dataflow, была ошибка; после того, как они исправили эту ошибку, вы можете увидеть основную причину и (2) размер пакета по умолчанию для размещения сущностей в этой библиотеке — 500, что также является максимальным, и это превышает ограничение в 10 МБ API хранилища данных.

(Очень простой) Pipeline выглядит так:

Query.Builder qb = Query.newBuilder();
qb.addKindBuilder().setName(kindName);
Query query = qb.build();
Read dsRead = DatastoreIO.v1().read().withProjectId(inputProject).withQuery(query);
Write dsWrite = DatastoreIO.v1().write().withProjectId(outputProject);
PCollection<Entity> sourceEntities = pipeline.apply("read", dsRead);
Bound<Entity, Entity> entityFromSrcToTarget = ParDo.of(new EntityDoFn());/*Simple DoFn that copies Entities for insertion to target*/
PCollection<Entity> clonedEntities = sourceEntities.apply("clone-entity", entityFromSrcToTarget);
clonedEntities.apply("write-to-ds", dsWrite);

Первая трассировка стека

com.google.datastore.v1.client.DatastoreException: I/O error, code=UNAVAILABLE at
com.google.datastore.v1.client.RemoteRpc.makeException(RemoteRpc.java:126) at
com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:95) at
com.google.datastore.v1.client.Datastore.commit(Datastore.java:84) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:925) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) Caused by: java.io.IOException: insufficient data written at
sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.close(HttpURLConnection.java:3500) at
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:81) at
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) at
com.google.datastore.v1.client.RemoteRpc.call(RemoteRpc.java:87) at
com.google.datastore.v1.client.Datastore.commit(Datastore.java:84) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.flushBatch(DatastoreV1.java:925) at
com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) at
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158) at
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196) at
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47) at
com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)

Также

(9908b474b1492772): java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: 
com.google.cloud.dataflow.sdk.util.UserCodeException: 
com.google.datastore.v1.client.DatastoreException: I/O error, code=UNAVAILABLE at 
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:283) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:507) at 
com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:125) at 
com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at 
com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at 
com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at 
com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at 
com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) at 
com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at 
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202) at 
com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:143) at 
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at 
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
 java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
 java.lang.Thread.run(Thread.java:745)

comment
Это похоже либо на регулирование, либо на перегрузку на стороне хранилища данных. Не могли бы вы предоставить идентификатор работы?   -  person Sam McVeety    schedule 20.10.2016
comment
Судя по трассировке стека, основная причина — com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$DatastoreWriterFn.finishBundle(DatastoreV1.java:899) Caused by: java.io.IOException: insufficient data written. Я нашел похожую ошибку на github.com/GoogleCloudPlatform/google-cloud-java/. pull/1187, которая была помечена как исправленная путем добавления параметра повторной попытки. Вы используете последнюю версию API?   -  person Sai Pullabhotla    schedule 20.10.2016
comment
@SaiPullabhotla Я использую com.google.cloud.dataflow.google-cloud-dataflow-java-sdk-all версии 1.7.0 (от maven).   -  person Joshua Fox    schedule 20.10.2016
comment
@SaiPullabhotla, который был помечен как исправленный путем добавления опции повторной попытки. Как я могу это сделать? Но также обратите внимание, что Dataflow повторяет тип 4-8 раз с промежутком ~ 75 секунд между каждой повторной попыткой.   -  person Joshua Fox    schedule 20.10.2016
comment
@SamMcVeety Идентификатор вакансии 2016-10-20_06_37_36-1944046122903944603   -  person Joshua Fox    schedule 20.10.2016
comment
@SamMcVeety Dataflow — это простая резервная копия хранилища данных: чтение, затем запись в другое хранилище данных. Обратите внимание, что одни и те же 5 из 130 видов всегда терпят неудачу при повторном запуске этого, независимо от того, запускаю ли я каждый вид в отдельном конвейере или несколько видов вместе в одном конвейере. Это говорит о том, что проблема не в простой перегрузке или троттлинге. Подсказка: запуск против несуществующего вида дает аналогичную ошибку. Но эти типы заполнены. Также это не самые крупные виды. Может есть что-то проблемное в данных тех Видов? Я думаю, что нет, но я хотел бы увидеть лучшее сообщение об ошибке, если это так.   -  person Joshua Fox    schedule 20.10.2016
comment
Я связался с командой хранилища данных, чтобы отладить их.   -  person Sam McVeety    schedule 20.10.2016
comment
Похоже, это проблема с квотами, которая искажается сообщением об ошибке. Можете ли вы увеличить квоту для своего проекта, следуя инструкциям на странице cloud.google.com/datastore/ документы/цены?   -  person Sam McVeety    schedule 21.10.2016
comment
@SamMcVeety Кажется, это, вероятно, не проблема квоты. (О каком ресурсе вы думали?). Страницы квот в консоли показывают, что мы намного ниже квоты на все ресурсы как в исходном, так и в целевом проектах. Кроме того, эти пять видов постоянно показывают это исключение (я не знаю, в режиме чтения или записи), в то время как 120 других видов постоянно преуспевают, даже при повторных запусках. (Кроме того, мы увеличили максимальную сумму к оплате, но опять же, в любом случае мы намного ниже ее.)   -  person Joshua Fox    schedule 25.10.2016
comment
@SamMcVeety Идентификатор задания 2016-10-25_04_43_59-5193766273039635775 сегодня не удалось; это тот же вид, что и 2016-10-21_00_31_14-11121776639025713355 на прошлой неделе. В отличие от этого, 2016-10-25_04_44_26-17162760287352949364 удается получить НАМНОГО больше данных при повторных запусках.   -  person Joshua Fox    schedule 25.10.2016
comment
Извиняюсь за молчание, работаю над получением дополнительной информации по этому вопросу.   -  person Sam McVeety    schedule 28.10.2016
comment
@SamMcVeety Спасибо, с нетерпением жду.   -  person Joshua Fox    schedule 29.10.2016
comment
Подсказка: Рабочие дают эти строки журнала. Из code.google.com/p/google-cloud -sdk/issues/detail?id=738 Я вижу, что это не обязательно признак ошибки, но может быть подсказкой. Отказано в разделении GroupingShuffleReader ‹незапущен в диапазоне перемешивания [ShufflePosition(base64:AAAAAUc2ap9ADopGAP8AAQ), ShufflePositio n(base64:AAAAAYTO8b-ABxAA_wAB))› в ShufflePosition(base64:AAAAAUc2ap9ADopGAP8AAQ) Предлагается динамическое разделение рабочей единицы tradeos-test1;2016-10_44_0_0 -17884292562202311365;544242318678188737 в {position:{shufflePosition:AAAAAUc2ap9ADopGAP8AAQ}}   -  person Joshua Fox    schedule 30.10.2016
comment
Последнее из идентификатора вакансии 2016-10-30_05_54_43-17884292562202311365.   -  person Joshua Fox    schedule 30.10.2016
comment
Наряду с другими сообщениями об ошибках я получаю DEADLINE_EXCEEDED. Тем не менее, похоже, что основная причина заключается в записи хранилища данных. ... RuntimeException: java.io.IOException: DEADLINE_EXCEEDED: (zmq) Истекло время ожидания RPC, когда dataflow-bk-rfq-10300422-5cf4-harness-g6yd разговаривает с tcp://bk-rfq-10300422-5cf4-harness-56aa :12345. Сервер не отвечает (ошибка проверки связи: статус: DEADLINE_EXCEEDED). См. также cloud.google.com/dataflow/pipelines/ на com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntry...   -  person Joshua Fox    schedule 30.10.2016
comment
Повторное использование Dataflow для ряда видов показывает, что это хуже для самых больших видов, в отличие от того, что я обнаружил ранее. Тем не менее, Dataflow предназначен именно для такой масштабируемости. My Pipeline просто выбирает с помощью Read, затем копирует объект, изменяя его Key в соответствии с целевым проектом Writes; и связывает их вместе   -  person Joshua Fox    schedule 31.10.2016
comment
Код основного конвейера см. в теле.   -  person Joshua Fox    schedule 31.10.2016
comment
@JoshuaFox недостаточно записанных данных фактически маскирует основное исключение, которое мы не можем видеть, и, таким образом, затрудняет отладку этой проблемы. (Проблема маскирования исправлена ​​в https://github.com/google/google-http-java-client/pull/333 — еще не объединены). Работа с командой хранилища данных, чтобы выяснить, можно ли это изменение объединить и включить в новую версию клиента хранилища данных. Завтра предоставлю обновление.   -  person Vikas Kedigehalli    schedule 02.11.2016
comment
Да, я предполагаю, что у хранилища данных есть какая-то проблема с емкостью. Будет хорошо знать, что.   -  person Joshua Fox    schedule 02.11.2016
comment
FWIW, простой запрос и вставка с использованием API gcloud-java-datastore 0.2.8 - без потока данных - был успешным с одним из типов, которые не удалось выполнить в конвейере потока данных.   -  person Joshua Fox    schedule 02.11.2016
comment
@JoshuaFox, это проект «cloud-eng-dev», верно? Можете ли вы также указать job_id для того, в котором запись в хранилище данных «cloud-eng-dev» прошла успешно? Это может помочь сравнить с теми, которые терпят неудачу.   -  person Vikas Kedigehalli    schedule 02.11.2016
comment
Нвм я их нашел. Похоже, одна из работ сработала просто отлично. Таким образом, для невыполненных заданий запросы никогда не доходили до службы хранилища данных. Похоже на проблему на стороне клиента. Я пытаюсь выпустить это, github.com/google/google-http-java-client/pull/333, и, надеюсь, мы сможем попробовать его, чтобы получить более четкое сообщение об ошибке. А пока не могли бы вы попробовать удалить «gcloud-java-datastore» из своей зависимости и воспроизвести проблему с «простым конвейером», который вы опубликовали? Я хочу убедиться, что это не странная проблема с зависимостями, с которой мы столкнулись.   -  person Vikas Kedigehalli    schedule 03.11.2016
comment
@VikasKedigehalli Я удалил gcloud-java-datastore из pom.xml и убедился, что его нет в действующем POM. Я получил в основном те же результаты, что и раньше. Большинство видов можно копировать (например, идентификатор задания 2016-11-03_07_09_30-2657413290769640959), по одному на конвейер или несколько вместе в конвейере (объединение PCollections с помощью и). Те же самые 5 видов, что и раньше, не удались при копировании каждого в отдельный конвейер (например, 2016-11-03_07_10_46-3370186458561566357). Однако, когда я запустил 5 типов вместе в объединенном конвейере, 4 удались и 1 не удалось (2016-11-03_06_54_04-7512693788272617823).   -  person Joshua Fox    schedule 03.11.2016
comment
@VikasKedigehalli Аналогичным образом, при работе без зависимости gcloud-java-datastore от другого проекта, в котором некоторые типы были успешными, а некоторые неудачными, по-прежнему появляется то же сообщение об ошибке. Идентификатор вакансии 2016-11-03_07_58_28-16985885300957305885   -  person Joshua Fox    schedule 03.11.2016
comment
@JoshuaFox Пытаюсь найти людей, чтобы объединить github.com/google/google-http-java-client/pull/333. Будет обновлено, как только это будет сделано, чтобы мы могли попытаться улучшить сообщения об ошибках при возникновении этой проблемы.   -  person Vikas Kedigehalli    schedule 07.11.2016
comment
@VikasKedigehalli Спасибо, это будет полезно. Я предполагаю, что как только мы увидим сообщение об ошибке, мы быстро найдем решение.   -  person Joshua Fox    schedule 07.11.2016
comment
@JoshuaFox: выпуск 3 разных библиотек, вероятно, займет больше времени. Что я собираюсь сделать, так это сделать пользовательские выпуски для этих библиотек и сделать выпуск Dataflow в моей ветке, который вы могли бы попробовать. Следите за обновлениями.   -  person Vikas Kedigehalli    schedule 10.11.2016
comment
@VikasKedighalli спасибо, будет полезно   -  person Joshua Fox    schedule 10.11.2016
comment
@JoshuaFox: https://github.com/vikkyrk/DataflowJavaSDK/tree/http_fix — моя ветвь потока данных это зависит от обновленных банок. Вот что вам нужно сделать: 1) Клонировать репозиторий/ветвь 2) ./jars/install.sh 3) mvn clean install -Dmaven.test.skip=true 4) Поместите простой код конвейера в каталог примеров 5) mvn compile -pl examples -Dexec.mainClass=‹полное имя вашего простого конвейера› -Dexec.args=‹ваши аргументы›   -  person Vikas Kedigehalli    schedule 11.11.2016
comment
Я следовал вашим инструкциям. Мне пришлось настроить исходный/целевой код на Java 1.8, поскольку мой код использует функции Java8, а также добавить зависимости в pom, но я старался избегать добавления каких-либо зависимостей, которые конфликтуют с вашими. Вы уверены, что ваши инструкции заставляют класс работать? Маван успешно выполнил последнюю команду, которую вы дали, но, по-видимому, ничего не было запущено. См. вывод здесь pastebin.com/Qsh0gzJX.   -  person Joshua Fox    schedule 11.11.2016
comment
@VikasKedigehalli Или, в качестве альтернативы, дайте мне знать, как использовать вашу библиотеку в моем собственном проекте. Возможно, я могу удалить определенную зависимость (какую?) из моего pom и поместить ваш скомпилированный код в свой путь к классам? Или добавить свой исходный код в мой исходный путь?   -  person Joshua Fox    schedule 11.11.2016
comment
@JoshuaFox была опечатка. Это должно быть mvn compile exec:java .... . exec:java говорит ему запуститься.   -  person Vikas Kedigehalli    schedule 11.11.2016
comment
Добавление его в ваш проект может быть более сложным. Причина, по которой я выбрал свою ветку, заключается в том, что у обоих есть доступ к ней для отладки.   -  person Vikas Kedigehalli    schedule 11.11.2016
comment
@VikasKedighalli Спасибо. Сбой такой же, как и ожидалось, но первая из двух трассировок стека в теле OP (запуск НЕДОСТУПЕН) предоставляет больше информации, чем раньше. См. pastebin.com/rDswCdF2. Ошибка касается Protobuf. В противном случае трассировка стека останется прежней после вашего изменения, Job 2016-11-12_09_47_29-9537593825735629484; по сравнению с тем, что было до до вашего изменения со всеми остальными аспектами, работа 2016-11-11_04_52_16-7323696975109393597.   -  person Joshua Fox    schedule 12.11.2016
comment
@JoshuaFox: Это очень полезная ошибка! Ошибка, по-видимому, связана с тем, что один HttpRequest слишком велик, и это объясняет, почему вы видите это для видов с большими сущностями. Один запрос, вероятно, превышает размер Http PostBuffer, настроенный в облаке. Я свяжусь с вами, чтобы узнать, как его настроить или можно ли это что-то обработать в SDK.   -  person Vikas Kedigehalli    schedule 13.11.2016
comment
Тем временем, если вы хотите подтвердить это, вы можете попробовать уменьшить размер пакета здесь https://github.com/vikkyrk/DataflowJavaSDK/blob/http_fix/sdk/src/main/java/com/google./cloud/dataflow/sdk/io/datastore/DatastoreV1.java#L189. Установите его на 50 или 100 и посмотрите, поможет ли это. (Убедитесь, что вы запустили mvn clean install после внесения изменений, чтобы они были приняты)   -  person Vikas Kedigehalli    schedule 13.11.2016
comment
@ВикасКедигехалли Да! Это сделало это! На самом деле я столкнулся со слишком большими ошибками при простой реализации без потока данных в сравнении с Google Cloud API. Когда размещение сущностей партиями по 500 не удалось, мой процесс повторил попытку размещения, экспоненциально уменьшая размер пакета до тех пор, пока размещение не увенчалось успехом. Если мы просто изменим DATASTORE_BATCH_UPDATE_LIMIT на 50, мы замедлим реализацию потока данных, верно? И нет никакой гарантии, что 50 достаточно мало. Каков наилучший способ решить эту проблему? (И мне также интересно: почему я единственный, кто сталкивается с этой слишком большой ошибкой?)   -  person Joshua Fox    schedule 14.11.2016
comment
@VikasKedigehalli Вы хотите добавить ответ, чтобы я мог его принять, или я могу написать этот ответ сам.   -  person Joshua Fox    schedule 16.11.2016
comment
Две части ответа: (1) необходимость исправления, доступного на Github, которое выявляет ошибку в журнале; (2) слишком большая партия путов, которую можно уменьшить, изменив статическое поле. URL-адрес для них: https://github.com/vikkyrk/DataflowJavaSDK/blob/http_fix/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/datastore/DatastoreV1.java#L189   -  person Joshua Fox    schedule 16.11.2016
comment
@JoshuaFox Зарегистрировал jira в рамках проекта Apache Beam (инкубирующий) issues.apache.org/jira/ просмотреть/BEAM-991. Не стесняйтесь отслеживать прогресс там. Я добавлю ответ, как только проблема будет решена.   -  person Vikas Kedigehalli    schedule 16.11.2016