Кодировщик для org.locationtech.jts.geom.Point не найден

При использовании Geomesa и Scala я пытался закодировать 2 столбца в кадре данных Spark, используя приведенные ниже фрагменты, но постоянно получаю сообщение о том, что Scala не может сериализовать возвращаемые объекты в кадр данных. При использовании Postgres и PostGIS жизнь проста — это простая проблема, или есть лучшая библиотека, которая может обрабатывать геопространственные запросы, поступающие из кадра данных Spark, который содержит широту и долготу в формате Double?

Версии, которые я использую в своем SBT:

  • искра: 2.3.0
  • скала: 2.11.12
  • Геомеса: 2.2.1
  • jst-*: 1.17.0-СНИМОК

Исключение в потоке "main" java.lang.UnsupportedOperationException: кодировщик не найден для org.locationtech.jts.geom.Point

import org.apache.spark.sql.SparkSession
import org.locationtech.jts.geom.{Coordinate, GeometryFactory}
import org.apache.spark.sql.functions.col

import org.apache.spark.sql.types._
import org.locationtech.geomesa.spark.jts._


object GetRandomData {


  def main(sysArgs: Array[String]) {

    @transient val spark: SparkSession = {
      SparkSession
        .builder()
        .config("spark.ui.enabled", "false")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.kryoserializer.buffer.mb","24")
        .appName("GetRandomData")
        .master("local[*]")
        .getOrCreate()
    }
    val sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    import spark.sqlContext.implicits._

    var coordinates = sc.parallelize(
      List(
        (35.40466, -80.905458),
        (35.344079, -80.872267),
        (35.139606, -80.840845),
        (35.537786, -80.780051),
        (35.525361, -83.031932),
        (34.928323, -80.766732),
        (35.533865, -82.72344),
        (35.50997,  -80.588572),
        (35.286251, -83.150514),
        (35.558519, -81.067069),
        (35.569311, -80.916993),
        (35.835867, -81.067904),
        (35.221695, -82.662141)
      )
    ).
    toDS().
    toDF("geo_lat", "geo_lng")

    coordinates = coordinates.select(coordinates.columns.map(c => col(c).cast(DoubleType)) : _*)
    coordinates.show()
    val testing = coordinates.map(r => new GeometryFactory().createPoint(new Coordinate(3.4, 5.6)))
    val coordinatesPointDf = coordinates.withColumn("point", st_makePoint(col("geo_lat"), col("geo_lng")))

  }
}

Исключение составляет:

    Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.locationtech.jts.geom.Point
- root class: "org.locationtech.jts.geom.Point"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.locationtech.geomesa.spark.jts.encoders.SpatialEncoders$class.jtsPointEncoder(SpatialEncoders.scala:21)
  at org.locationtech.geomesa.spark.jts.package$.jtsPointEncoder(package.scala:17)
  at GetRandomData$.main(Main.scala:50)
  at GetRandomData.main(Main.scala)

person Eric Meadows    schedule 06.03.2019    source источник
comment
Можете ли вы быстро попробовать GeoMesa версии 2.1?   -  person GeoMesaJim    schedule 07.03.2019


Ответы (2)


Если вы не используете базовое хранилище GeoMesa для загрузки данных в искровой сеанс, вам необходимо явно зарегистрировать типы JTS с помощью:

org.apache.spark.sql.SQLTypes.init(spark.sqlContext)

Это зарегистрирует операции ST_, а также кодировщики JTS.

person tom-kunicki    schedule 07.03.2019

На простом английском языке исключение говорит:

Я не знаю, как преобразовать Point в тип Spark.

Если вы сохраните широту и долготу как двойные в своем наборе данных, тогда все будет в порядке, но как только вы используете такой объект, как Point, вам нужно будет указать Spark, как его преобразовать. В терминах Spark они называются кодировщиками, и вы можете создавать собственные.

Или вы переключаетесь на RDD, где преобразование не требуется, если вы не возражаете против потери материала Spark SQL.

person Kit Menke    schedule 06.03.2019
comment
Теперь это сработало! :). Меня вообще не волнует Spark SQL - я должен был просто ударить себя, так как я должен просто не иметь дело с кадрами данных, а вместо этого иметь дело с RDD. - person Eric Meadows; 07.03.2019