Как использовать pyspark mllib RegressionMetrics с реальными прогнозами

С pyspark 1.4 я пытаюсь использовать RegressionMetrics() для прогнозов, сгенерированных LinearRegressionWithSGD.

Все примеры для RegressionMetrics() приведены в pyspark mllib. документация предназначена для "искусственных" прогнозов и наблюдений, таких как

predictionAndObservations = sc.parallelize([ (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])

Для такого "искусственного" (сгенерированного с помощью sc.parallelize) RDD все работает нормально. Однако, делая то же самое с другим RDD, сгенерированным другим способом, я получаю

TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

Краткий воспроизводимый пример приведен ниже.

В чем может быть проблема?

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()

Давайте проверим, что RDD действительно состоит из пар (прогноз, наблюдение)

prediObserRDD.take(4) # looks OK

Теперь попробуйте посчитать метрики

metrics = RegressionMetrics(prediObserRDD)

Выдает следующую ошибку

TypeError                                 Traceback (most recent call last)
<ipython-input-1-ca9ad8e9faf1> in <module>()
      7 prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()
      8 prediObserRDD.take(4)
----> 9 metrics = RegressionMetrics(prediObserRDD)
     10 #metrics.explainedVariance
     11 #metrics.meanAbsoluteError

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/mllib/evaluation.py in __init__(self, predictionAndObservations)
     99         df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([
    100             StructField("prediction", DoubleType(), nullable=False),
--> 101             StructField("observation", DoubleType(), nullable=False)]))
    102         java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics
    103         java_model = java_class(df._jdf)

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio)
    337 
    338         for row in rows:
--> 339             _verify_type(row, schema)
    340 
    341         # convert python objects to sql data

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
   1027                              "length of fields (%d)" % (len(obj), len(dataType.fields)))
   1028         for v, f in zip(obj, dataType.fields):
-> 1029             _verify_type(v, f.dataType)
   1030 
   1031 _cached_cls = weakref.WeakValueDictionary()

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
   1011     if type(obj) not in _acceptable_types[_type]:
   1012         raise TypeError("%s can not accept object in type %s"
-> 1013                         % (dataType, type(obj)))
   1014 
   1015     if isinstance(dataType, ArrayType):

TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

Та же проблема возникает (для другого набора данных и задачи классификации) с BinaryClassificationMetrics.


person lanenok    schedule 16.07.2015    source источник
comment
@eliasah Благодаря вашему комментарию я обнаружил в SparkProgrammingGuide, что DoubleWritable соответствует типу float в Python. Поэтому я преобразовал значения в число с плавающей запятой, и теперь все работает. Пожалуйста, напишите свой комментарий в качестве ответа, чтобы я мог его принять   -  person lanenok    schedule 17.07.2015
comment
Кстати, это довольно неожиданное поведение искры. DenseVector, например, массив IS numpy. Было бы разумно автоматически преобразовать все типы numpy.float в DoubleType.   -  person lanenok    schedule 17.07.2015


Ответы (1)


Как говорит ошибка TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

Вы пытаетесь преобразовать numpy.float64 в Double, что невозможно.

Чтобы решить эту ошибку TypeError, вам нужно будет преобразовать свое значение в принятый Type.

Пример :

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (float(lrModel.predict(p.features)), p.label)).cache()

Если вы заметили, я преобразовал метку предсказания в двойную, используя встроенную функцию Python float.

Теперь вы можете вычислить свои показатели:

>>> metrics = RegressionMetrics(prediObserRDD)
>>> metrics.explainedVariance
1.0
person eliasah    schedule 17.07.2015