Я использую Spark 3.1 в Databricks (Databricks Runtime 8) с очень большим кластером (25 рабочих с 112 ГБ памяти и 16 ядер каждый) для репликации нескольких таблиц SAP в хранилище озера данных Azure (ADLS gen2). Для этого инструмент записывает дельты всех этих таблиц в промежуточную систему (SQL Server), а затем, если у меня есть новые данные для определенной таблицы, я выполняю задание Databricks, чтобы объединить новые данные с существующими доступными данными. в ADLS.
Этот процесс работает нормально для большинства таблиц, но для объединения некоторых из них (самых больших) требуется много времени (я объединяю данные, используя PK каждой таблицы), а самая большая из них начала давать сбой с недели. назад (когда была сгенерирована большая дельта таблицы). След ошибки, которую я вижу в работе:
Поскольку ошибка не описательна, я просмотрел журнал каждого исполнителя и увидел следующее сообщение:
21/04/07 09:11:24 ОШИБКА OneForOneBlockFetcher: сбой при загрузке блока запуска java.io.IOException: соединение с /XXX.XX.XX.XX:4048 закрыто
И в исполнителе, который, похоже, не может подключиться, я вижу следующее сообщение об ошибке:
21/04/06 09:30:46 ОШИБКА SparkThreadLocalCapturingRunnable: исключение в потоке. Задача reaper-7 org.apache.spark.SparkException: уничтожение JVM-исполнителя, поскольку убитая задача 5912 не может быть остановлена в течение 60000 мс. в org.apache.spark.executor.Executor $ TaskReaper.run (Executor.scala: 1119) в org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable. $ anonfun $ run $ 1 (SparkThreadLocalForwardingThreadPoolExecutor attime. .java8.JFunction0 $ mcV $ sp.apply (JFunction0 $ mcV $ sp.java: 23) на org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured (SparkThreadLocalForwardingThreadPoolExe 68. or.scala: .threads.SparkThreadLocalCapturingHelper.runWithCaptured $ (SparkThreadLocalForwardingThreadPoolExecutor.scala: 54) при org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured (SparkThreadLocalForwardingThreadPoolExecutor.scala: 101) в org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run ( SparkThreadLocalForwardingThreadPoolExecutor.scala: 104) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor.lt $ Workor.javaexecutor.lt) в java.java: ang.Thread.run (
Я попытался увеличить параллелизм перемешивания по умолчанию (с 200 до 1200, как это предлагается здесь Приложение Spark убивает исполнителя) и кажется, что задание больше времени выполняется, но оно снова терпит неудачу.
Я попытался отслеживать SparkUI, пока задание выполняется:
Но, как вы можете видеть, проблема та же: некоторые этапы не работают, потому что исполнитель недоступен, потому что задача терпит неудачу более X раз.
Большая дельта, о которой я упоминал выше, имеет более или менее 4-5 миллиардов строк, а большой дамп, который я хочу объединить, содержит более или менее 100 миллионов строк. Таблица (пока) не разбита на разделы, поэтому процесс очень трудоемкий. Что не удается, так это часть слияния, а не процесс копирования данных из SQL Server в ADLS, поэтому слияние выполняется, когда данные для слияния уже находятся в формате Parquet.
Есть идеи о том, что происходит или что я могу сделать, чтобы завершить это слияние?
Заранее спасибо.
Наконец, я просмотрел кластер и изменил свойство spark.sql.shuffle.partitions на 1600 в коде задания, которое я хотел выполнить с этой конфигурацией (вместо того, чтобы изменять это непосредственно в кластере ). В моем кластере 400 ядер, поэтому я выбрал несколько (1600) из этого числа.