Пользовательский аккумулятор Spark V2

Я определил пользовательский аккумулятор как:

import org.apache.spark.util.LongAccumulator

class CustomAccumulator extends LongAccumulator with java.io.Serializable  {
  override def add(v: Long): Unit = {
    super.add(v)
    if (v % 100 == 0) println(v)
  }
}

И зарегистрировал его как:

val cusAcc = new CustomAccumulator
sc.register(cusAcc, "customAccumulator")

Моя проблема в том, что когда я пытаюсь использовать его как:

val count = sc.customAccumulator  

Я получаю следующую ошибку:

<console>:51: error: value customAccumulator is not a member of org.apache.spark.SparkContext
   val count = sc.customAccumulator

Я новичок в Spark и scala и, возможно, упускаю что-то очень тривиальное. Любое руководство будет принято с благодарностью.


person AM01    schedule 13.03.2017    source источник


Ответы (2)


Согласно Spark API, AccumulatorV2 больше не находится в пакете org.apache.spark.SparkContext;, вместо этого он был перемещен в org.apache.spark.util.

person user8375305    schedule 27.07.2017

Поскольку Spark 2.0.0 вы должны использовать метод register в abstract class AccumulatorV2: org.apache.spark.util.AccumulatorV2#register.

Что-то вроде этого:

cusAcc.register(sc, scala.Option("customAccumulator"), false);
person Dedkov Vadim    schedule 30.07.2017