Сбой задания AWS Glue для больших входных CSV-данных на s3

Для небольших входных файлов s3 (~ 10 ГБ) задание склеивания ETL работает нормально, но для большего набора данных (~ 200 ГБ) задание не выполняется.

Добавление части кода ETL.

# Converting Dynamic frame to dataframe
df = dropnullfields3.toDF()

# create new partition column
partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))

# store the data in parquet format on s3 
partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append")

Работа выполнена за 4 часа и выдала ошибку.

Конец LogType: stdout

Буду признателен, если вы предоставите какие-либо рекомендации по решению этой проблемы.

Вы можете установить настраиваемые параметры, такие как

# Converting Dynamic frame to dataframe
df = dropnullfields3.toDF()

# create new partition column
partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))

# store the data in parquet format on s3 
partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append")
, только во время создания экземпляра контекста, а клей предоставляет вам контекст (из памяти вы не можете создать экземпляр нового контекста). Я не думаю, что вы сможете изменить значение этого свойства.


person Sumit Saurabh    schedule 24.11.2017    source источник
comment
Я тоже пробовал это решение, но возникла та же проблема. stackoverflow.com/a/31058669/3957916   -  person Sumit Saurabh    schedule 24.11.2017
comment
Этот issues.apache.org/jira/browse/SPARK-12837 может быть связанные с   -  person Sumit Saurabh    schedule 24.11.2017
comment
Файл "script_2017-11-23-15-07-32.py", строка 49, в partitioned_dataframe.write.partitionBy (['part_date']). Format ("parquet"). Save (output_lg_partitioned_dir, mode = "append" ) Файл "/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/pyspark.zip/pyspark/sql/readwriter.py application", строка 550, в файле сохранения "/ mnt11 / user449 / app_cache / root_65 / mnt / /container_1511449472652_0001_02_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py ", строка 1133, в файле call" / mnt / yarn / usercache / root / appcache / application_1511449472652_0001 / container_1511park_151100_0001 / container_1511p2 .zip / pyspark / sql / utils.py ", строка 63, в файле deco" /mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/py4j-0.10.4-srjc.zip/py4j-0.10.4-srjc.zip/py , строка 319, в get_return_value py4j.protocol.Py4JJavaError: произошла ошибка при вызове o172.save. : org.apache.spark.SparkException: задание прервано. в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply $ mcV $ sp (FileFormatWriter.scala: 147) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 121) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 121) в org.apache.spark.sql. execute.SQLExecution $ .withNewExecutionId (SQLExecution.scala: 57) в org.apache.spark.sql.execution.datasources.FileFormatWriter $ .write (FileFormatWriter.scala: 121) в org.apache.spark.sql.exe. InsertIntoHadoopFsRelationCommand.run (InsertIntoHadoopFsRelationCommand.scala: 101) в org.apache.spark.sql.execution.command.ExehibitedCommandExec.sideEffectResult $ lzycompute (commands.Executed.scala: 58) в org.machel. sideEffectResult (commands.scala: 56) в org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute (commands.scala: 74) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ выполнить $ 1.apply (SparkPlan.scala: 114) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ выполнить $ 1.apply (SparkPlan. scala: 114) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 135) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151 ) в org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 132) в org.apache.spark.sql.execution. SparkPlan.execute (SparkPlan.scala: 113) в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (QueryExecution.scala: 87) в org.apache.spark.sql.execution.QueryExecution.toExecution (. scala: 87) в org.apache.spark.sql.execution.datasources.DataSource.write (DataSource.scala: 492) в org.apache.spark.sql.DataFrameWriter.save (DataFrameWriter.scala: 215) в org.apache .spark.sql.DataFrameWriter.save (DataFrameWriter.scala: 198) в sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.invoke) в Sun. (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.Reflection. java: 357) на py4j.Gateway.invoke (Gateway.java:280) на py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.j ava: 132) в py4j.commands.CallCommand.execute (CallCommand.java:79) в py4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run (Thread.java:748) Вызвано: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: общий размер сериализованных результатов 3385 задач (1024,1 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ) в org.apache.spark.scheduler.DAGScheduler. org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1435) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1423) в org.apark. .scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1422) в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.foreach (ArrayBuffer. .scala: 48) в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1422) в org.apache.spark.scheduler.DAGScheduler $$ ano nfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 802) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 802) в scala.Option.foreach (Option.scala: 257 ) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 802) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 1650) в org.Scheduler. onReceive (DAGScheduler.scala: 1605) в org.apache.spark.scheduler. DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1594) в org.apache.spark.util.EventLoop $$ anon $ 1.run (EventLoop.scala: 48) в org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler. 628) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 1918) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 1931) в org.apache.spark.SparkContext.runJob (SparkContext.scala : 1951) at org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply $ mcV $ sp (FileFormatWriter.scala: 127) ... еще 30   -  person Anuruddha    schedule 10.01.2018


Ответы (1)


Обычно вы получаете эту ошибку, если собираете для драйвера результаты, превышающие указанный размер. В данном случае вы этого не делаете, поэтому ошибка сбивает с толку.

Похоже, вы создаете 3385 задач, которые предположительно связаны с датами во входном файле (3385 дат, ~ 9 лет?). Вы можете попробовать записывать этот файл партиями, например

Я не проверял этот код; вам, по крайней мере, нужно импортировать pyspark.sql.functions.year, чтобы он заработал.

partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))
for year in range(2000,2018):
    partitioned_dataframe = partitioned_dateframe.where(year(part_date) = year)
    partitioned_dataframe.write.partitionBy(['part_date'])
        .format("parquet")
        .save(output_lg_partitioned_dir, mode="append")

Когда я выполнил обработку данных с помощью Glue, я просто обнаружил, что пакетная работа более эффективна, чем попытки добиться успешного завершения больших наборов данных. Система хороша, но трудна в отладке; стабильность на больших данных не дается легко.

Задание прервано из-за сбоя этапа: общий размер сериализованных результатов 3385 задач (1024,1 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ). Не знаю, как решить эту проблему. пожалуйста помоги!

person Kirk Broadhurst    schedule 10.01.2018