Цикл через большой DynamicFrame для вывода на S3, чтобы обойти ошибку maxResultSize

У меня есть большой DynamicFrame в задании AWS Glue ETL. При попытке вывести эти данные на S3 происходит сбой, так как задача слишком велика.

Ошибка:

Вызвано: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: общий размер сериализованных результатов 3225 задач (1024,0 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)

Я считаю, что хорошим решением для этого будет разделение моего DynamicFrame по дате, циклический просмотр данных с каждой даты и их вывод небольшими порциями. Возможно что-то вроде:

for eventDateParam in mapped_datasource0_general.eventDate:
    partitioned_dataframe_general = mapped_datasource0_general.where(eventDate = eventDateParam)
    dataoutput_general = glueContext.write_dynamic_frame.from_options(frame = partitioned_dataframe_general, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_general, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_general")

Я относительно новичок в AWS Glue и сталкиваюсь с множеством ошибок, пытаясь найти здесь обходной путь. Любые предложения приветствуются.

Ваше здоровье!

=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

Редактировать:

Более длинный стек трассировки:

Traceback (most recent call last):
File "script_2018-06-19-22-36-11.py", line 63, in <module>
glueContext.write_dynamic_frame.from_options(frame = partitioned_mapped_personal_DF, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_personal, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_personal")
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 572, in from_options
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/context.py", line 191, in write_dynamic_frame_from_options
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/context.py", line 214, in write_from_options
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/data_sink.py", line 32, in write
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1198.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 3109 tasks (1024.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 45 more

И соответствующие строки кода:

listOfDistinctsPersonal = mapped_personal.select("eventDate").distinct()

#LOOP WRITE PERSONAL
for eventDateParam in listOfDistinctsPersonal:
    partitioned_mapped_personal = mapped_personal.where(col("eventDate") == eventDateParam)
    partitioned_mapped_personal_DF = DynamicFrame.fromDF(partitioned_mapped_personal, glueContext, "partitioned_mapped_personal_DF")
    glueContext.write_dynamic_frame.from_options(frame = partitioned_mapped_personal_DF, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_personal, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_personal")

person Eric Keen    schedule 15.06.2018    source источник


Ответы (1)


Вы говорите, что хотите выводить данные на S3. Если вы не выполняете тяжелую агрегацию (groupBy или join), размер вашего _1 _ / _ 2_ не имеет значения - просто перемещение данных займет больше времени.

Полученная ошибка указывает на то, что вы пытаетесь передать свой DataFrame драйверу:

spark.driver.maxResultSize - это конфигурация, управляющая теперь большим количеством данных, которые могут передаваться обратно в драйвер от исполнителей. Обычно это происходит, когда вы звоните DataFrame.collect или DataFrame.collectAsList. Дополнительные сведения см. В документации Spark.

Вы сказали, что получаете различные ошибки - может быть, ваша работа вызывает ошибку где-то еще? Не могли бы вы поделиться всем своим клеевым кодом? и, возможно, также трассировка стека ошибки?

При проверке журналов Cloud Watch убедитесь, что вы изменили представление с Row на Text:  введите описание изображения здесь

person botchniaque    schedule 18.06.2018
comment
Различные ошибки, которые я получаю, связаны с моей попыткой исправить. Дополнительный образец из исходного журнала ошибок: Traceback (most recent call last): File "script_2018-06-14-15-45-26.py", line 74, in <module> dataoutput_personal = glueContext.write_dynamic_frame.from_options(frame = mapped_datasource0_personal, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_personal, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_personal") - person Eric Keen; 18.06.2018
comment
Поможет ли мне здесь удаление dataoutput_personal = ? Это эквивалент .collect()? Учитывая, что это последнее действие скрипта, я фактически не использую DynamicFrame в дальнейшем. - person Eric Keen; 18.06.2018
comment
Я не думаю, что write_dynamic_frame вызовет collect внутренне. Вы не выполняете какие-либо преобразования в mapped_datasource0_personal фрейме данных? - person botchniaque; 18.06.2018
comment
mapped_datasource0_personal преобразуется из фрейма данных PySpark непосредственно перед строкой вывода в коде: mapped_datasource0_personal = DynamicFrame.fromDF(mapped_personal, glueContext, "mapped_datasource0_personal") Запись динамического фрейма, который я выводю на S3, в новый DynamicFrame может вызвать проблемы? И большое спасибо за помощь @botchniaque - person Eric Keen; 18.06.2018
comment
Стоит отметить, что задание отлично работает на меньшем выборочном наборе данных. Я зациклил свой вывод по параметру даты в данных, чтобы он был разбит на более мелкие фрагменты, и удалил dataoutput_personal = , поскольку он ничего не делает функционально. Теперь происходит сбой с новой ошибкой, на этот раз без отслеживания в журналах, но сообщение об ошибке в пользовательском интерфейсе AWS Glue: Command failed with exit code 1. К сожалению, изо всех сил пытаюсь найти что-нибудь полезное. - person Eric Keen; 19.06.2018
comment
@EricKeen, чтобы увидеть что-то полезное в журналах в CloudWatch, вам нужно изменить представление журнала с Row на Text, потому что одна строка журналов склейки всегда многострочная - я обновляю свои ответ со скриншотом. - person botchniaque; 19.06.2018
comment
Просматривая журналы конкретных ошибок, я не вижу никаких трассировок или чего-либо полезного. В общих журналах есть несколько экземпляров: 18/06/18 19:08:46 INFO S3NativeFileSystem: Encountered an exception while reading 'Events_10422/LIVE/EVENTS_10424_146270827.csv.gz', will retry by attempting to reopen stream. java.net.SocketTimeoutException: Read timed out. Я пробовал выполнять свою работу с подмножеством этих файлов, и она работает нормально. Других очевидных проблем нет, но журналы огромные! У меня включены закладки для вакансий, которые, как я читал, могут быть связаны с этим exit code 1. Все еще копаю ... - person Eric Keen; 19.06.2018
comment
Я отредактировал свой исходный вопрос, чтобы добавить более длинную трассировку. - person Eric Keen; 20.06.2018
comment
Я также добавил код, вызывающий эту ошибку, надеюсь, что это хоть как-то поможет. Это из-за попытки повторного запуска с отключенной закладкой, но возникла та же ошибка, что и раньше. - person Eric Keen; 20.06.2018
comment
Вы, ребята, знаете, как разбить строки на основе количества строк в DynamicFrame? У меня есть сценарий, в котором мне нужно разделить результат на 4 части в зависимости от общего количества строк. Я просмотрел документы и обнаружил, что могу разделить их, только применив определенное условие к полю. - person Sujai Sivasamy; 05.02.2019