Неописательная ошибка Spark в DELTA MERGE

Я использую Spark 3.1 в Databricks (Databricks Runtime 8) с очень большим кластером (25 рабочих с 112 ГБ памяти и 16 ядер каждый) для репликации нескольких таблиц SAP в хранилище озера данных Azure (ADLS gen2). Для этого инструмент записывает дельты всех этих таблиц в промежуточную систему (SQL Server), а затем, если у меня есть новые данные для определенной таблицы, я выполняю задание Databricks, чтобы объединить новые данные с существующими доступными данными. в ADLS.

Этот процесс работает нормально для большинства таблиц, но для объединения некоторых из них (самых больших) требуется много времени (я объединяю данные, используя PK каждой таблицы), а самая большая из них начала давать сбой с недели. назад (когда была сгенерирована большая дельта таблицы). След ошибки, которую я вижу в работе:

Поскольку ошибка не описательна, я просмотрел журнал каждого исполнителя и увидел следующее сообщение:

21/04/07 09:11:24 ОШИБКА OneForOneBlockFetcher: сбой при загрузке блока запуска java.io.IOException: соединение с /XXX.XX.XX.XX:4048 закрыто

И в исполнителе, который, похоже, не может подключиться, я вижу следующее сообщение об ошибке:

21/04/06 09:30:46 ОШИБКА SparkThreadLocalCapturingRunnable: исключение в потоке. Задача reaper-7 org.apache.spark.SparkException: уничтожение JVM-исполнителя, поскольку убитая задача 5912 не может быть остановлена ​​в течение 60000 мс. в org.apache.spark.executor.Executor $ TaskReaper.run (Executor.scala: 1119) в org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable. $ anonfun $ run $ 1 (SparkThreadLocalForwardingThreadPoolExecutor attime. .java8.JFunction0 $ mcV $ sp.apply (JFunction0 $ mcV $ sp.java: 23) на org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured (SparkThreadLocalForwardingThreadPoolExe 68. or.scala: .threads.SparkThreadLocalCapturingHelper.runWithCaptured $ (SparkThreadLocalForwardingThreadPoolExecutor.scala: 54) при org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured (SparkThreadLocalForwardingThreadPoolExecutor.scala: 101) в org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run ( SparkThreadLocalForwardingThreadPoolExecutor.scala: 104) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor.lt $ Workor.javaexecutor.lt) в java.java: ang.Thread.run (

Я попытался увеличить параллелизм перемешивания по умолчанию (с 200 до 1200, как это предлагается здесь Приложение Spark убивает исполнителя) и кажется, что задание больше времени выполняется, но оно снова терпит неудачу.

Я попытался отслеживать SparkUI, пока задание выполняется:  введите описание изображения здесь

Но, как вы можете видеть, проблема та же: некоторые этапы не работают, потому что исполнитель недоступен, потому что задача терпит неудачу более X раз.

Большая дельта, о которой я упоминал выше, имеет более или менее 4-5 миллиардов строк, а большой дамп, который я хочу объединить, содержит более или менее 100 миллионов строк. Таблица (пока) не разбита на разделы, поэтому процесс очень трудоемкий. Что не удается, так это часть слияния, а не процесс копирования данных из SQL Server в ADLS, поэтому слияние выполняется, когда данные для слияния уже находятся в формате Parquet.

Есть идеи о том, что происходит или что я могу сделать, чтобы завершить это слияние?

Заранее спасибо.

Наконец, я просмотрел кластер и изменил свойство spark.sql.shuffle.partitions на 1600 в коде задания, которое я хотел выполнить с этой конфигурацией (вместо того, чтобы изменять это непосредственно в кластере ). В моем кластере 400 ядер, поэтому я выбрал несколько (1600) из этого числа.


person AngryCoder    schedule 09.04.2021    source источник
comment
Py4JJavaError: ошибка при вызове o233.sql. : org.apache.spark.SparkException: задание прервано. в org.apache.spark.sql.execution.datasources.FileFormatWriter $ .write (FileFormatWriter.scala: 234) в com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge. $ anonfun $ writeFiles $ 5 (TransactionalWriteEdge ) ... .. ............................................ .................................................. .................................................. .................................................. .................................................. .................................................. .................................................. .................... Вызвано: org.apache.spark.SparkException: исключение, созданное в awaitResult: at org.apache.spark.util.ThreadUtils $ .awaitResult ( ThreadUtils.scala: 428) по адресу com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.awaitShuffleMapStage $ 1 (DeltaOptimizedWriterExec.scala: 153) по адресу com.databricks.sql.transaction.tahoe.pertimized.DeltaOptimized.DeltaOptimized.DeltaOptimized.DeltaOptimized.Tahoe. : 158) в c om.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.computeBins (DeltaOptimizedWriterExec.scala: 106) в com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.doExectimized (или DeltaOptimizedWriterExec.doExectimized) (DeltaOptimizedWriterExec. spark.sql.execution.SparkPlan. $ anonfun $ выполнить $ 1 (SparkPlan.scala: 196) в org.apache.spark.sql.execution.SparkPlan. $ anonfun $ executeQuery $ 1 (SparkPlan.scala: 240) в org.apache. spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 165) в org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 236) в org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 192) в org.apache.spark.sql.execution.datasources.FileFormatWriter $ .write (FileFormatWriter.scala: 180) ... еще 141 Причина: org.apache.spark.SparkException: задание прервано из-за до отказа этапа: ShuffleMapStage 68 (выполнить на DeltaOptimizedWriterExec.scala: 97) не прошел максимально допустимое количество раз: 4. Самая последняя причина отказа: org.apache.spark.shuffle.Fet chFailedException: соединение от /XXX.XX.XX.XX:4048 закрыто в org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException (ShuffleBlockFetcherIterator.scala: 769) в org.apache.spark.storageIterator.scala: : 684) в org.apache.spark.storage.ShuffleBlockFetcherIterator.next (ShuffleBlockFetcherIterator.scala: 69) в. .................................................. .................................................. .................................................. .................................................. .................................................. .................................................. ....................... ... java.lang.Thread.run (Thread.java:748) Вызвано: java.io.IOException: Connection из /XXX.XX.XX.XX:4048 закрыто в org.apache.spark.network.client.TransportResponseHandler.channelInactive (TransportResponseHandler.java:146) в org.apache.spark.network.server.TransportChannelHandler. java: 117) в io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.java:262) в io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelChannelContext) . java. (AbstractChannelHandlerContext.java:262) в io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.java:248) в io.netty.channel.AbstractChannelHandlerContext.FireChannelHandlerContext.fireChannel.http: //hannel.html. channelInactive (ChannelInboundHandlerAdapter.java:81) в org.apache.spark.network.util.TransportFrameDecoder.channelInactive (TransportFrameDecoder.java:225) в io.netty.channel.AbstractChannelHandlerContext.invokeChannel.net .channel.AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.java:248) в io.ne tty.channel.AbstractChannelHandlerContext.fireChannelInactive (AbstractChannelHandlerContext.java:241) в io.netty.channel.DefaultChannelPipeline $ HeadContext.channelInactive (DefaultChannelPipeline.java:1405) при io.net на io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive (AbstractChannelHandlerContext.java:248) на io.netty.channel.DefaultChannelPipeline.fireChannelInactive (DefaultChannelPipeline.java:901. AbstractChannel $ AbstractUnsafe $ 8.run (AbstractChannel.java:818) в io.netty.util.concurrent.AbstractEventExecutor.safeExecute (AbstractEventExecutor.java:164) в io.netty.util.concurrent.SingleThreadEventEventAllExecut. ) в io.netty.channel.nio.NioEventLoop.run (NioEventLoop.java:497) в io.netty.util.concurrent.SingleThreadEventExecutor $ 4.run (SingleThreadEventExecutor.java:989) в io.netty.util.internal.Thread $ 2.run (ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run (FastThreadLocalRunnable.java:30) ... еще 1   -  person Alex Ott    schedule 09.04.2021


Ответы (1)


После этого казнь закончилась через два часа. Я пришел к такому выводу, потому что в моих журналах и пользовательском интерфейсе Spark я наблюдал много утечек на диске, поэтому я подумал, что разделы не подходят для рабочих узлов.

посмотрите здесь: stackoverflow.com/questions/66659817/

person AngryCoder    schedule 12.04.2021