Spark не позволяет мне подсчитывать присоединенные кадры данных

Новичок в Spark Jobs, и у меня есть следующая проблема.

Когда я запускаю подсчет любого из недавно присоединенных фреймов данных, задание выполняется целую вечность и выбрасывает память на диск. Нет ли здесь логической ошибки?

    // pass spark configuration
    val conf = new SparkConf()
      .setMaster(threadMaster)
      .setAppName(appName)

    // Create a new spark context
    val sc = new SparkContext(conf)

    // Specify a SQL context and pass in the spark context we created
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)


    // Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
    val dfSentRaw = sqlContext.read.parquet(inputPathSent)
    val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
    val dfFailedRaw  = sqlContext.read.parquet(inputPathFailed)



    // Rename the columns to avoid ambiguity when accessing the fields later
    val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
      .withColumnRenamed("campaign_id", "sent__campaign_id")
      .withColumnRenamed("ced_email", "sent__ced_email")
      .withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
      .withColumnRenamed("riid", "sent__riid")


    val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
      .withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
    val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")


    // LEFT Join with CLICKED on two fields, customer_id and campaign_id
    val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
      && dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
     dfSentClicked.count() //THIS WILL NOT WORK

val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
      && dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")

Почему эти два/три кадра данных больше не учитываются? Я испортил некоторую индексацию, переименовав?

Благодарю вас!

введите здесь описание изображения


person ZedBrannigan    schedule 16.10.2015    source источник
comment
Несколько вопросов: Какую ошибку вы видите? Вы пытались подсчитать кадры данных перед соединением? Пробовали ли вы объединить имена столбцов в разных фреймах данных и вызвать это соединение (join(right: DataFrame, usingColumns: Seq[String], joinType: String))?   -  person Rohan Aletty    schedule 16.10.2015
comment
1. Это не настоящая ошибка, я просто чувствую, что задание выполняется вечно и сбрасывается (UnsafeExternalSorter сливает память на диск) 2. Да, подсчет работает - предварительное соединение 3. Я не понимаю этого, что я сделал по-другому? Большое спасибо за Вашу помощь!   -  person ZedBrannigan    schedule 16.10.2015
comment
Каков размер данных? А какие у вас конфигурации памяти кластера?   -  person eliasah    schedule 16.10.2015
comment
Я запускаю локально в intelliJ с xmx2048m на 32-гигабайтной машине (должен был упомянуть об этом). Данные представляют собой подмножество того, что находится в кластере. Около 1 ГБ.   -  person ZedBrannigan    schedule 16.10.2015


Ответы (1)


Этот вызов count является единственной фактической материализацией вашего задания Spark здесь, так что на самом деле проблема не в count, а в тасовке, которая выполняется для join прямо перед ним. У вас недостаточно памяти, чтобы выполнить соединение без сброса на диск. Сброс на диск в случайном порядке — это очень простой способ заставить ваши задания Spark работать вечно =).

Одна вещь, которая действительно помогает предотвратить разбрызгивание при перетасовке, — это наличие большего количества разделов. Тогда меньше данных перемещается через перетасовку в любой момент времени. Вы можете установить spark.sql.shuffle.partitions, который управляет количеством разделов, используемых Spark Sql в соединении или агрегации. По умолчанию установлено значение 200, поэтому вы можете попробовать более высокое значение. http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

Вы можете увеличить размер кучи вашего локального выделения Spark и/или увеличить долю памяти, используемую для перетасовки, увеличив spark.shuffle.memoryFraction (по умолчанию 0,4) и уменьшив spark.storage.memoryFraction (по умолчанию 0,6). Доля хранилища используется, например, когда вы делаете вызов .cache, и вам это может быть безразлично.

Если вы так склонны полностью избегать разлива, вы можете отключить разлив, установив spark.shuffle.spill на false. Я полагаю, что это вызовет исключение, если у вас закончится память и вам нужно разлить, а не молча ждать вечно, и это может помочь вам быстрее настроить распределение памяти.

person Rich    schedule 28.10.2015