Я пытаюсь создать несколько простых пользовательских агрегатных операторов в Spark, используя Scala.
Я создал простую иерархию операторов со следующим суперклассом:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
У меня также есть объект-компаньон, который каждый раз создает соответствующий агрегатор. Обратите внимание, что каждому оператору разрешено указывать желаемый тип значения.
Теперь проблема в том, что когда я пытаюсь позвонить combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
Ошибка:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
Для моих нужд Value
может быть либо числовым типом, либо кортежем, поэтому его определение не имеет границ. Если я заменю объявление типа Value
на:
type Value = Double
в Aggregator
классе, то все нормально работает. Поэтому я предполагаю, что ошибка связана с тем, что reduceByKey
не знает точного типа Value
во время компиляции.
Любые идеи о том, как обойти это?
ClassTags
? Проверьте, например, stackoverflow.com/q/39993041/9613318 - person Alper t. Turker   schedule 13.05.2018implicitly
может помочь. Что-то вроде stackoverflow.com/q/47644051/9613318 - person Alper t. Turker   schedule 13.05.2018combineByKey
наreduceByKey
) - кажется, я не могу обойти это. Я думаю, что мне нужен способ, чтобыreduceByKey
выбирал фактический типagg.Value
во время выполнения. Даже сimplicitly
как в ответе выше все равно нет. - person ergys   schedule 13.05.2018