Функция Sparklyr spark_apply, похоже, работает на одном исполнителе и дает сбой на умеренно большом наборе данных.

Я пытаюсь использовать spark_apply для запуска функции R ниже в таблице Spark. Это отлично работает, если моя входная таблица небольшая (например, 5000 строк), но через ~ 30 минут выдает ошибку, когда таблица умеренно большая (например, 5 000 000 строк): sparklyr worker rscript failure, check worker logs for details

Глядя на пользовательский интерфейс Spark, видно, что всегда создается только одна задача, и к этой задаче применяется один исполнитель.

Кто-нибудь может дать совет, почему эта функция не работает для набора данных из 5 миллионов строк? Может ли проблема заключаться в том, что один исполнитель выполняет всю работу и не работает?

# Create data and copy to Spark
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table
                 string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000))
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE)

# Write function to return dataframe with strings split out
myFunction <- function(inputdf){
  inputdf$string_categories <- as.character(inputdf$string_categories)
  inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories))
  stringCategoriesList <- strsplit(inputdf$string_categories, ' ')
  outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))),
                  string_categories=unlist(stringCategoriesList))
 return(outDF)
}

# Use spark_apply to run function in Spark
outtbl <- testtbl %>%
  spark_apply(myFunction,
          names=c('string_id', 'string_categories'))
outtbl

person jay    schedule 25.09.2017    source источник


Ответы (1)


  1. Ошибка sparklyr worker rscript failure, check worker logs for details записывается узлом драйвера и указывает, что фактическую ошибку необходимо найти в журналах рабочих процессов. Обычно к рабочим журналам можно получить доступ, открыв stdout на вкладке исполнителя в пользовательском интерфейсе Spark; журналы должны содержать RScript: записей, описывающих, что обрабатывает исполнитель, и особенности ошибки.

  2. Что касается создаваемой одиночной задачи, когда columns не указаны с типами в spark_apply(), ей необходимо вычислить подмножество результата, чтобы угадать типы столбцов, чтобы избежать этого, передать явные типы столбцов следующим образом:

    outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

  3. Если используется sparklyr 0.6.3, обновите до sparklyr 0.6.4 или devtools::install_github("rstudio/sparklyr"), поскольку sparklyr 0.6.3 содержит неверное время ожидания в некоторых пограничных случаях, когда включено распространение пакетов и на каждом узле работает более одного исполнителя.

  4. При высокой нагрузке часто заканчивается память. Увеличение количества разделов может решить эту проблему, поскольку это уменьшит общий объем памяти, необходимый для обработки этого набора данных. Попробуйте запустить это как:

    testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

  5. Также может случиться так, что функция выдает исключение для некоторых разделов из-за логики в функции, вы можете увидеть, так ли это, используя tryCatch() для игнорирования ошибок, а затем найти пропущенные значения и почему функция потерпит неудачу для этих значений. Я бы начал с чего-то вроде:

    myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }

person Javier Luraschi    schedule 25.09.2017
comment
Спасибо за такой исчерпывающий ответ! Увеличение количества разделов решило проблему, но там также есть много дополнительной информации, которая поможет мне двигаться вперед. - person jay; 26.09.2017
comment
поднимая этот хороший ответ, а также - person ℕʘʘḆḽḘ; 30.03.2018