Как получить доступ к значению аккумулятора в задачах?

Я пытаюсь получить доступ к значению аккумулятора, находясь в задаче кластера. Но когда я это делаю, возникает исключение:

не могу прочитать значение аккумулятора

Я пытался использовать row.localValue, но он возвращает те же числа. Есть ли обходной путь?

private def modifyDataset(
  data: String, row: org.apache.spark.Accumulator[Int]): Array[Int] = {

  var line = data.split(",")
  var lineSize = line.size      
  var pairArray = new Array[Int](lineSize-1)
  var a = row.value
  paiArray(0)=a

  row+=1
  pairArray

}


var sc = Spark_Context.InitializeSpark
var row = sc.accumulator(1, "Rows")

var dataset = sc.textFile("path")

var pairInfoFile = noHeaderRdd.flatMap{ data => modifyDataset(data,row) }
  .persist(StorageLevel.MEMORY_AND_DISK)        
pairInfoFile.count()

person Nick    schedule 08.12.2015    source источник


Ответы (1)


Это просто невозможно, и обходного пути нет. Spark accumulators — это переменные только для записи с точки зрения работника. Любая попытка прочитать его значение во время выполнения задачи не имеет смысла, потому что между рабочими процессами нет общего состояния, а значение локального аккумулятора отражает только состояние текущего раздела.

Вообще говоря, accumulators предназначены в основном для диагностики и не должны использоваться как часть логики приложения. При использовании внутри преобразований единственная гарантия, которую вы получаете, — это хотя бы однократное выполнение.

См. также: Как распечатать переменную-аккумулятор из задачи (похоже, работает без вызова метода значения)?

person zero323    schedule 08.12.2015