Соединитель искры Neo4j loadDataFrame выдает ошибку

РЕДАКТИРОВАТЬ: Пытался сделать то, что было предложено в комментариях с .toDF, получил эту ошибку:

                                 ^
neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@333e01c6
import sqlContext.implicits._
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n using Map()
<console>:48: error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
       val df = rdd.toDF()

Я запускаю этот простой код scala:

import org.neo4j.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf



val conf = new SparkConf.setMaster("local").setAppName("neo4jspark")
val sc = new SparkContext(conf)
val neo = Neo4j(sc)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

//val rdd = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadRowRdd

val df = neo.cypher("call spatial.withinDistance('geom', {lat:35.8954016,lon:41.5505458}, 500) yield node, distance WITH node, distance match (node:POINT) WHERE node.toDateFormatLong < 20170213 return node as n").loadDataFrame

Это дает несколько ошибок, проблема с conf - это ошибка, но, похоже, она работает, когда я загружаю RDD. Но здесь я тоже получаю ошибку, все равно выдает мне количество элементов в вызове, хотя затем я получаю ошибки сериализации. Не уверен, есть ли какие-то шаги или вещи, которые я упускаю из этого примера здесь:

https://blog.knoldus.com/2016/10/05/neo4j-with-scala-awesome-experience-with-spark/

Loading neo4jspark.scala...
import org.neo4j.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
<console>:38: error: not found: value SparkConf
       val conf = new SparkConf.setMaster("local").setAppName("neo4jspark")
                      ^
<console>:38: error: not found: value conf
       val sc = new SparkContext(conf)
                                 ^
neo: org.neo4j.spark.Neo4j = org.neo4j.spark.Neo4j@5dfb65d5
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) MATCH (p:POINT) RETURN p using Map()
res0: Long = 53118
17/08/25 14:31:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/08/25 14:31:15 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<5>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<5>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 5)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 79 elided

person Codejoy    schedule 25.08.2017    source источник
comment
Похоже, вы забыли импортировать sql implicits val sqlCtx = new SQLContext(sc) import sqlCtx.implicits._   -  person sgireddy    schedule 26.08.2017
comment
обновил код с чем у меня такая же ошибка, может я неправильно реализовал?   -  person Codejoy    schedule 28.08.2017
comment
Я не думаю, что это проблема SparkConf, если только neo4j не ожидает конкретной конфигурации для своего API DataFrame. не могли бы вы попробовать следующее (попробуйте извлечь df из RDD)? Просто чтобы определить основную причину, если это Neo4J или Spark. Также не могли бы вы попробовать другой запрос Neo4j, чтобы исключить, что запрос не виноват? val rdd = neo.cypher(вызов пространственного.withinDistance('geom', {широта:35.8954016,долгота:41.5505458}, 500) узел выхода, расстояние С узлом, совпадение расстояния (узел:ТОЧКА) ГДЕ node.toDateFormatLong ‹ 20170213 возвращаемый узел как n).loadRowRdd val df = rdd.toDF   -  person sgireddy    schedule 28.08.2017
comment
я пробовал с toDF, но с большим запросом, завтра попробую другой запрос (хотя введенный запрос работал в браузере neo4j)   -  person Codejoy    schedule 28.08.2017
comment
запустил его с гораздо более простым запросом и той же ошибкой: val df = neo.cypher("MATCH (p:POINT) RETURN p").loadDataFrame   -  person Codejoy    schedule 29.08.2017
comment
Я узнал, что проблема в том, что запрос действительно сложный. github.com/neo4j-contrib/neo4j-spark-connector/issues/ 40   -  person Codejoy    schedule 29.08.2017