Чтение RDF в apache spark

Я пытаюсь прочитать файл RDF\XML в Apache spark (scala 2.11, apache spark 1.4.1) с помощью Apache Jena. Я написал этот фрагмент scala:

val factory = new RdfXmlReaderFactory()
HadoopRdfIORegistry.addReaderFactory(factory)
val conf = new Configuration()
conf.set("rdf.io.input.ignore-bad-tuples", "false")
val data = sc.newAPIHadoopFile(path,
    classOf[RdfXmlInputFormat],
    classOf[LongWritable], //position
    classOf[TripleWritable],   //value
    conf)
data.take(10).foreach(println)

Но выдает ошибку:

INFO readers.AbstractLineBasedNodeTupleReader: Got split with start 0 and length 21765995 for file with total length of 21765995
15/07/23 01:52:42 ERROR readers.AbstractLineBasedNodeTupleReader: Error parsing whole file, aborting further parsing
org.apache.jena.riot.RiotException: Producer failed to ever call start(), declaring producer dead
        at org.apache.jena.riot.lang.PipedRDFIterator.hasNext(PipedRDFIterator.java:272)
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:242)
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
   ...
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Error parsing whole file at position 0, aborting further parsing
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:285)
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)

Файл хорош, потому что я могу разобрать его локально. Что мне не хватает?

EDIT Некоторая информация для воспроизведения поведения

Импорт:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry
import org.apache.jena.hadoop.rdf.io.registry.readers.RdfXmlReaderFactory
import org.apache.jena.hadoop.rdf.types.QuadWritable
import org.apache.spark.SparkContext

scalaVersion := "2.11.7"

зависимости:

"org.apache.hadoop"             % "hadoop-common"      % "2.7.1",
"org.apache.hadoop"             % "hadoop-mapreduce-client-common" % "2.7.1",
"org.apache.hadoop"             % "hadoop-streaming"   % "2.7.1", 
"org.apache.spark"              % "spark-core_2.11"  % "1.4.1", 
"com.hp.hpl.jena"               % "jena"               % "2.6.4",
"org.apache.jena"               % "jena-elephas-io"    % "0.9.0",
"org.apache.jena"               % "jena-elephas-mapreduce" % "0.9.0"

Я использую образец rdf из здесь. Это свободно доступная информация о сеансах John Peel (дополнительная информация о дампе).


person Nikita    schedule 22.07.2015    source источник
comment
Что вы имеете в виду, вы можете разобрать его локально?   -  person Marius Soutier    schedule 23.07.2015
comment
Тот факт, что вы получаете эту конкретную ошибку, означает, что синтаксический анализ никогда не запускался, это, скорее всего, потому, что файл был недоступен. Вы не сказали нам значение path, но это первое, что нужно проверить.   -  person RobV    schedule 23.07.2015
comment
@MariusSoutier Я могу прочитать тот же файл из inputStream.   -  person Nikita    schedule 23.07.2015
comment
@RobV Если файл не существует, я получаю org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist:, но это другой случай. он начинает читать файл: Got split with start 0 and length 21765995   -  person Nikita    schedule 23.07.2015
comment
Что ж, без дополнительной информации невозможно определить, в чем проблема, ваш код неполный (нет операторов импорта) и нет значения для path, поэтому его нельзя запустить как есть, тем более что у нас нет ваших данных. Тот же код (с соответствующими операторами импорта) работает в моей среде, поэтому, вероятно, есть какая-то проблема с вашей средой (например, версия Spark) или вашими данными, но невозможно сказать, как она есть.   -  person RobV    schedule 23.07.2015
comment
@RobV В любом случае спасибо за ваше время. Я добавил библиотеки deps и ссылку на файл примера rdf\xml.   -  person Nikita    schedule 23.07.2015
comment
Можете ли вы попробовать сократить файл до одной строки, т.е. удалить все \n?   -  person Marius Soutier    schedule 23.07.2015
comment
@MariusSoutier Спасибо за хорошую догадку! но это не помогло :(. Насколько я понимаю, проблема с входным потоком в позиции 0. Похоже, что базовый pipeIterator не может проходить через поток.   -  person Nikita    schedule 23.07.2015


Ответы (2)


Таким образом, похоже, ваша проблема заключалась в том, что вы вручную управляли своими зависимостями.

В моей среде я просто передавал в свою оболочку Spark следующее:

--packages org.apache.jena:jena-elephas-io:0.9.0

Это делает все разрешение зависимостей для вас

Если вы создаете проект SBT, достаточно сделать следующее в вашем build.sbt:

libraryDependencies += "org.apache.jena" % "jena-elephas-io" % "0.9.0"
person RobV    schedule 23.07.2015

Спасибо всем за обсуждение в комментариях. Проблема была действительно сложной и неясной из трассировки стека: коду нужна одна дополнительная зависимость для работы jena-core, и эта зависимость должна быть упакована в первую очередь.

"org.apache.jena" % "jena-core" % "2.13.0"
"com.hp.hpl.jena" % "jena"      % "2.6.4"

Я использую эту стратегию сборки:

lazy val strategy = assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => {
  case PathList("META-INF", xs @ _*) =>
    (xs map {_.toLowerCase}) match {
      case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
      case _ => MergeStrategy.discard
    }
  case x => MergeStrategy.first
}
}
person Nikita    schedule 23.07.2015
comment
Пожалуйста, не смешивайте Jena 2.6.4 и Jena 2.13.0, первая из 2010, а вторая из 2015, и их смешивание может привести к неприятным вещам. - person RobV; 23.07.2015