Ошибка Spark 2.0.0: PartitioningCollection требует, чтобы все его разделы имели одинаковые numPartitions

Я объединяю несколько DataFrames в Spark и продолжаю получать следующую ошибку:

PartitioningCollection requires all of its partitionings have the same numPartitions.

Кажется, это происходит после того, как я объединяю два DataFrame вместе, каждый из которых кажется достаточно разумным по отдельности, но после присоединения к ним, если я попытаюсь получить строку из объединенного DataFrame, я получаю эту ошибку. На самом деле я просто пытаюсь понять, почему может возникнуть эта ошибка или каков ее смысл, поскольку я не могу найти по ней никакой документации.

Следующий вызов приводит к этому исключению:

val resultDataframe = dataFrame1
  .join(dataFrame2,     
    $"first_column" === $"second_column").take(2)

но я конечно могу позвонить

dataFrame1.take(2)

а также

dataFrame2.take(2)

Я также попытался переразбить DataFrames, используя Dataset.repartition(numPartitions) или Dataset.coalesce(numParitions) на dataFrame1 и dataFrame2 перед присоединением и на resultDataFrame после соединения, но, похоже, ничего не повлияло на ошибку. Мне не удалось найти упоминания о других людях, получающих ошибку после беглого поиска в Google ...


person Clemente Cuevas    schedule 29.09.2016    source источник


Ответы (4)


Я столкнулся с той же проблемой в последние несколько дней и был разочарован, когда не нашел ссылок в Интернете. До твоего!

Я бы добавил пару вещей: я получаю сообщение об ошибке после довольно сложного набора операций с фреймами данных (несколько соединений). Кроме того, эти операции включают фреймы данных, которые генерируются из одного и того же родительского фрейма данных. Я пытаюсь получить минимальный пример для его репликации, но извлечь его из моего конвейера нетривиально.

Я подозреваю, что у Spark могут возникнуть проблемы с вычислением правильного плана, когда DAG становится слишком сложным. К сожалению, похоже, что если это ошибка в Spark 2.0.0, то ночные сборки еще не исправили ее (пару дней назад я пробовал снимок состояния 2.0.2).

Кажется, практическое решение, которое устраняет проблему (временно), заключается в следующем: записать на диск (в какой-то момент) некоторые фреймы данных в конвейере и прочитать их снова. Это фактически вынуждает Spark иметь гораздо меньший, более управляемый план оптимизации, и что ж, он больше не дает сбоев. Конечно, это временное решение.

person L.T.    schedule 30.09.2016
comment
Спасибо за вашу демонстрацию консолидации и за то, что, надеюсь, может оказаться полезным, хотя и признанным временным решением. Я попробую это сделать, но я думаю, что есть некоторая вероятность того, что у нас может быть отчет об ошибке, если решение окажется вне досягаемости stackoverflow на некоторое время. - person Clemente Cuevas; 30.09.2016
comment
Также обратите внимание, что в версии 1.6.x тот же код (за исключением очень незначительных отличий) работает так, как задумано, без сбоев, поэтому мне это тоже кажется ошибкой. - person L.T.; 30.09.2016
comment
Однако ваше временное решение действительно решило проблему! Я пока не решаюсь отметить это как ответ, если только никто не ответит иначе, и мы решим перейти к искре JIRA, тогда тоже можно, но спасибо. - person Clemente Cuevas; 30.09.2016

У меня тоже была такая же проблема. Для меня это произошло после удаления некоторых столбцов из выбранной части соединения (а не самого предложения соединения).

Я смог исправить это, вызвав .repartition() в фреймворке данных.

person Nick Lothian    schedule 19.10.2016
comment
Спасибо. Это было лучшее исправление, чем когда-либо выше! - person StackPointer; 20.07.2017

Вы вызываете метод кеширования?

Эта проблема возникает у меня только тогда, когда я использую метод кеширования. Если я не вызываю этот метод, я могу использовать данные без каких-либо проблем.

person Luis A.G.    schedule 13.07.2017

Эта проблема связана с ReorderJoinPredicates исправлена ​​ в Spark 2.3.0.

person seaman29    schedule 19.06.2018