Spark reduceByKey с универсальными типами (Scala)

Я пытаюсь создать несколько простых пользовательских агрегатных операторов в Spark, используя Scala.

Я создал простую иерархию операторов со следующим суперклассом:

sealed abstract class Aggregator(val name: String) {
  type Key = Row  // org.apache.spark.sql.Row
  type Value

  ...
}

У меня также есть объект-компаньон, который каждый раз создает соответствующий агрегатор. Обратите внимание, что каждому оператору разрешено указывать желаемый тип значения.

Теперь проблема в том, что когда я пытаюсь позвонить combineByKey:

val agg = Aggregator("SUM")
val res = rdd
    .map(agg.mapper)
    .reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))

Ошибка:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]

Для моих нужд Value может быть либо числовым типом, либо кортежем, поэтому его определение не имеет границ. Если я заменю объявление типа Value на:

type Value = Double

в Aggregator классе, то все нормально работает. Поэтому я предполагаю, что ошибка связана с тем, что reduceByKey не знает точного типа Value во время компиляции.

Любые идеи о том, как обойти это?


person ergys    schedule 13.05.2018    source источник
comment
Отсутствует ClassTags? Проверьте, например, stackoverflow.com/q/39993041/9613318   -  person Alper t. Turker    schedule 13.05.2018
comment
@user9613318 user9613318 Я видел это, но мой RDD создается внутри метода (таким образом, метод не параметризуется по типу). Есть ли способ вручную создать неявные ClassTags?   -  person ergys    schedule 13.05.2018
comment
implicitly может помочь. Что-то вроде stackoverflow.com/q/47644051/9613318   -  person Alper t. Turker    schedule 13.05.2018
comment
(Примечание: изменено combineByKey на reduceByKey) - кажется, я не могу обойти это. Я думаю, что мне нужен способ, чтобы reduceByKey выбирал фактический тип agg.Value во время выполнения. Даже с implicitly как в ответе выше все равно нет.   -  person ergys    schedule 13.05.2018


Ответы (1)


Ваш RDD не может быть неявно преобразован в PairRDDFunctions, потому что все неявные ClassTag для ключей и значений отсутствуют.

Возможно, вы захотите включить теги класса в качестве неявных параметров в свой Aggregator:

sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
  implicit val keyClassTag: ClassTag[K] = implicitly
  implicit val valueClassTag: ClassTag[V] = implicitly
}

или, может быть:

sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
  implicit val keyClassTag: ClassTag[K] = kt
  implicit val valueClassTag: ClassTag[V] = vt
}

или, может быть, даже:

sealed abstract class Aggregator(name: String) {
  type K
  type V
  implicit def keyClassTag: ClassTag[K]
  implicit def valueClassTag: ClassTag[V]
}

В последнем варианте ответственность за предоставление ClassTags будет переложена на разработчика абстрактного класса.

Теперь при использовании агрегатора a типа Aggregator[K, V] в reduceByKey вам нужно убедиться, что эти неявно предоставленные теги классов находятся в текущей неявной области видимости:

val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
person Andrey Tyukin    schedule 13.05.2018
comment
Это сработало, большое спасибо за это! Я выбрал способ 3, так как хотел сделать вызывающего абонента K/V-агностиком. В каждом реализаторе у меня теперь есть override type V = SomeType и override implicit def valueClassTag = ClassTag(ClassOf[V]). Отмечен как принятый ответ. - person ergys; 14.05.2018