У меня есть большой DynamicFrame в задании AWS Glue ETL. При попытке вывести эти данные на S3 происходит сбой, так как задача слишком велика.
Ошибка:
Вызвано: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: общий размер сериализованных результатов 3225 задач (1024,0 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)
Я считаю, что хорошим решением для этого будет разделение моего DynamicFrame по дате, циклический просмотр данных с каждой даты и их вывод небольшими порциями. Возможно что-то вроде:
for eventDateParam in mapped_datasource0_general.eventDate:
partitioned_dataframe_general = mapped_datasource0_general.where(eventDate = eventDateParam)
dataoutput_general = glueContext.write_dynamic_frame.from_options(frame = partitioned_dataframe_general, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_general, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_general")
Я относительно новичок в AWS Glue и сталкиваюсь с множеством ошибок, пытаясь найти здесь обходной путь. Любые предложения приветствуются.
Ваше здоровье!
=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
Редактировать:
Более длинный стек трассировки:
Traceback (most recent call last):
File "script_2018-06-19-22-36-11.py", line 63, in <module>
glueContext.write_dynamic_frame.from_options(frame = partitioned_mapped_personal_DF, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_personal, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_personal")
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 572, in from_options
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/context.py", line 191, in write_dynamic_frame_from_options
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/context.py", line 214, in write_from_options
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/data_sink.py", line 32, in write
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1529446917701_0002/container_1529446917701_0002_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1198.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 3109 tasks (1024.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 45 more
И соответствующие строки кода:
listOfDistinctsPersonal = mapped_personal.select("eventDate").distinct()
#LOOP WRITE PERSONAL
for eventDateParam in listOfDistinctsPersonal:
partitioned_mapped_personal = mapped_personal.where(col("eventDate") == eventDateParam)
partitioned_mapped_personal_DF = DynamicFrame.fromDF(partitioned_mapped_personal, glueContext, "partitioned_mapped_personal_DF")
glueContext.write_dynamic_frame.from_options(frame = partitioned_mapped_personal_DF, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path_personal, "partitionKeys": ["eventDate"]}, format = "parquet", transformation_ctx = "dataoutput_personal")