Эффективное выполнение на фреймах данных PySpark/Delta

Используя озера pyspark/Delta на Databricks, у меня есть следующий сценарий:

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)

analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here

Насколько я понимаю Spark с озерами Delta, из-за цепного выполнения result фактически вычисляется не при объявлении, а при его использовании.

Однако в этом примере оно используется несколько раз, поэтому наиболее затратное преобразование вычисляется несколько раз.

Можно ли принудительно выполнить выполнение в какой-то момент кода, например.

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??

analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??

person casparjespersen    schedule 01.11.2019    source источник
comment
Когда кэшировать DataFrame? и Что такое Lineage In Spark?   -  person pault    schedule 01.11.2019
comment
Название вопроса ошибочно.   -  person thebluephantom    schedule 01.11.2019


Ответы (1)


Вопрос, на мой взгляд, расплывчатый или неясный. Но если вы новичок в Spark, это может иметь место.

So:

Для использования .force см. https://blog.knoldus.com/getting-lazy-with-scala/ .force не будет работать с набором данных или фреймом данных.

Это как-то связано с подходом pyspark или Delta Lake? Нет нет.

analysis_1 = result.groupBy(...).count() # quick smaller transformation?? 
  • На самом деле это Действие с предшествующими Преобразованиями, что, скорее всего, приводит к перетасовке.

Итак, я думаю, вы имеете в виду, как утверждает наш уважаемый Поулт, следующее:

  • .кэш или .persist

Я подозреваю, что вам понадобится:

result.cache 

Это означает, что вашему 2nd Action analysis_2 не нужно будет пересчитывать весь путь обратно к источнику, показанному здесь.

(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43: 
8/8
succeeded / total tasks 
Stage 44: 
200/200
succeeded / total tasks 
Stage 45:   
1/1
succeeded / total tasks 
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46: 
0/8
succeeded / total tasks skipped
Stage 47: 
200/200
succeeded / total tasks 
Stage 48:   
1/1
succeeded / total tasks 

Благодаря улучшениям, внесенным в Spark, разделы в случайном порядке сохраняются, что также приводит к пропуску этапов в некоторых случаях, особенно для RDD. Для кадров данных требуется кэширование, чтобы получить эффект пропущенных этапов, который я наблюдаю.

person thebluephantom    schedule 02.11.2019
comment
Я действительно новичок в Spark, поэтому я пытаюсь понять различные концепции. Это, однако, прогрессирует :-) Спасибо за объяснение. Как намекнул @pault, я искал именно концепцию cache. Спасибо вам всем. - person casparjespersen; 02.11.2019