Почему бы не сбалансировать нагрузку при параллельных вычислениях с использованием снегопада?

В течение долгого времени я использовал sfLapply для многих моих параллельных r-скриптов. Однако в последнее время, когда я больше углубился в параллельные вычисления, я использовал sfClusterApplyLB, который может сэкономить много времени, если отдельные экземпляры не занимают одинаковое количество времени для запуска. Если sfLapply будет ждать завершения каждого экземпляра пакета перед загрузкой нового пакета (что может привести к простою экземпляров), то экземпляры sfClusterApplyLB, выполнившие свою задачу, будут немедленно назначены оставшимся элементам в списке, что потенциально сэкономит немало времени, когда экземпляры не занимают точно такое же количество времени. Это привело меня к вопросу, почему мы вообще не хотим балансировать нагрузку при использовании снегопада? Единственное, что я пока обнаружил, это то, что при возникновении ошибки в параллельном скрипте sfClusterApplyLB все равно циклически перебирает весь список, прежде чем выдать ошибку, в то время как sfLapply останавливается после попытки первой партии. Что еще мне не хватает? Существуют ли какие-либо другие затраты/недостатки балансировки нагрузки? Ниже приведен пример кода, показывающий разницу между двумя

rm(list = ls()) #remove all past worksheet variables
working_dir="D:/temp/"
setwd(working_dir)
n_spp=16
spp_nmS=paste0("sp_",c(1:n_spp))
spp_nm=spp_nmS[1]
sp_parallel_run=function(sp_nm){
  sink(file(paste0(working_dir,sp_nm,"_log.txt"), open="wt"))#######NEW
  cat('\n', 'Started on ', date(), '\n') 
  ptm0 <- proc.time()
  jnk=round(runif(1)*8000000) #this is just a redundant script that takes an arbitrary amount of time to run
  jnk1=runif(jnk)
  for (i in 1:length(jnk1)){
    jnk1[i]=jnk[i]*runif(1)
  }
  ptm1=proc.time() - ptm0
  jnk=as.numeric(ptm1[3])
  cat('\n','It took ', jnk, "seconds to model", sp_nm)

  #stop sinks
  sink.reset <- function(){
    for(i in seq_len(sink.number())){
      sink(NULL)
    }
  }
  sink.reset()
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))

sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time((sfLapply(spp_nmS,fun=sp_parallel_run)))
sfRemoveAll()
sfStop()

sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time(sfClusterApplyLB(spp_nmS,fun=sp_parallel_run)) 
sfRemoveAll()
sfStop()

person Lucas Fortini    schedule 03.02.2014    source источник


Ответы (1)


Функция sfLapply полезна, поскольку она разбивает входные значения на одну группу задач для каждого доступного работника, что функция mclapply называет предварительным планированием. Это может дать гораздо лучшую производительность, чем sfClusterApplyLB, когда задачи не занимают много времени.

Вот крайний пример, демонстрирующий преимущества предварительного планирования:

> system.time(sfLapply(1:100000, sqrt))
   user  system elapsed
  0.148   0.004   0.170
> system.time(sfClusterApplyLB(1:100000, sqrt))
   user  system elapsed
 19.317   1.852  21.222
person Steve Weston    schedule 03.02.2014
comment
Это очень интересный момент, но, как вы заметили, это очень крайний пример. Это, вероятно, не будет иметь значения в противоположном экстремальном сценарии (т. Е. Гораздо более короткий список задач, которые нужно распределить, каждая из которых занимает до нескольких часов), верно? - person Lucas Fortini; 04.02.2014
comment
Раньше я всегда использовал функцию snow clusterApplyLB для всего, поэтому пакет doSNOW использовал ее исключительно. Если задачи не короткие, обычно довольно безопасно использовать sfClusterApplyLB, но есть несколько ловушек, которых sfLapply избегает благодаря предварительному планированию. - person Steve Weston; 04.02.2014