Как использовать Delta Lake с искровой раковиной?

Я пытаюсь писать как Spark DF как DeltaTable. Он отлично работает в моей IDE Intelliji, но с теми же зависимостями и версиями он не работает в моей Spark REPL (оболочке Spark)

Версия Spark: 2.4.0 Версия Scala: 2.11.8

Зависимости в Intelliji (зависимости для всего проекта, пожалуйста, не обращайте внимания)

    compile 'org.scala-lang:scala-library:2.11.8'
    compile 'org.scala-lang:scala-reflect:2.11.8'
    compile 'org.scala-lang:scala-compiler:2.11.8'
    compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.1.2'
    compile 'org.scala-lang.modules:scala-swing_2.11:2.0.3'
    compile 'org.apache.spark:spark-mllib_2.11:2.4.0'
    compile 'org.apache.spark:spark-sql_2.11:2.4.0'
    compile 'org.apache.spark:spark-graphx_2.11:2.4.0'
    compile 'org.apache.spark:spark-launcher_2.11:2.4.0'
    compile 'org.apache.spark:spark-catalyst_2.11:2.4.0'
    compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
    compile group: 'io.delta', name: 'delta-core_2.11', version: '0.5.0'
    compile 'org.apache.spark:spark-core_2.11:2.4.0'
    compile 'org.apache.spark:spark-hive_2.11:2.4.0'
    compile 'com.databricks:spark-avro_2.11:4.0.0'
    compile 'org.apache.avro:avro-mapred:1.8.2'
    compile 'org.apache.avro:avro:1.8.2'
    compile 'org.apache.avro:avro-compiler:1.8.2'
    compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.15'
    compile group: 'commons-io', name: 'commons-io', version: '2.5'
    testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.26'
    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompile group: 'org.scalatest', name: 'scalatest_2.12', version: '3.2.0-SNAP10'
    compile group: 'javax.mail', name: 'javax.mail-api', version: '1.6.2'
    compile group: 'com.sun.mail' ,name: 'javax.mail', version: '1.6.0'
    compile 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11'
    compile 'com.hortonworks:shc:1.1.1-2.1-s_2.11'
    compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase', version: '1.2.5', ext: 'pom'
    compile group: 'org.apache.hbase', name: 'hbase-protocol', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-hadoop2-compat', version: '1.2.5'
    compile group: 'org.apache.hbase', name: 'hbase-annotations', version: '1.2.5'

    // jackson modues
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
    compile group: 'org.codehaus.jackson', name: 'jackson-core-asl', version: '1.9.13'
    compile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.8.7'
    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.8.6'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.8.6'
    compile group: 'org.json4s', name: 'json4s-jackson_2.11', version: '3.2.10'
    compile group: 'com.twitter', name: 'parquet-jackson', version: '1.6.0'
    compile group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13'
    compile group: 'org.codehaus.jackson', name: 'jackson-xc', version: '1.9.13'
    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-paranamer', version: '2.8.6'
    compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-annotations', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-auth', version: '2.7.3'
    compile group: 'org.apache.hadoop', name: 'hadoop-yarn-common', version: '2.7.3'

Часть кода, которую я пытаюсь выполнить

import io.delta._

val dF=spark.read.load("path") //parquet file
dF.write.format("delta").mode("overwrite").partitionBy("topic","partition","key").save("path") // delta table

spark-shell Используемая команда:

spark-shell --packages com.fasterxml.jackson.core:jackson-databind:2.8.6,com.fasterxml.jackson.core:jackson-core:2.10.0,org.codehaus.jackson:jackson-core-asl:1.9.13,org.codehaus.jackson:jackson-mapper-asl:1.9.13,com.fasterxml.jackson.core:jackson-annotations:2.8.7,com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.6,com.fasterxml.jackson.module:jackson-module-scala_2.11:2.8.6,com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.8.6,com.twitter:parquet-jackson:1.6.0,org.codehaus.jackson:jackson-jaxrs:1.9.13,org.codehaus.jackson:jackson-xc:1.9.13,com.fasterxml.jackson.module:jackson-module-paranamer:2.8.6,io.delta:delta-core_2.11:0.5.0,commons-io:commons-io:2.5

Ошибка в REPL:

Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
    at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:127)
    at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:202)
    at org.apache.spark.sql.delta.actions.Metadata$$anonfun$schema$1.apply(actions.scala:201)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.sql.delta.actions.Metadata.schema$lzycompute(actions.scala:201)
    at org.apache.spark.sql.delta.actions.Metadata.schema(actions.scala:200)
    at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:61)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:45)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:85)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:65)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:396)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:133)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
    at org.controller.deltaLakeEG.deltaLakeHadoopEg$.main(deltaLakeHadoopEg.scala:29)
    at org.controller.deltaLakeEG.deltaLakeHadoopEg.main(deltaLakeHadoopEg.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


person Raptor0009    schedule 28.12.2019    source источник


Ответы (3)


Согласно официальной документации:

Delta Lake требует Apache Spark версии 2.4.2 или выше.

Пожалуйста, обновите свою версию Spark как минимум до 2.4.2 в IntelliJ IDEA (или возникнут проблемы). На момент написания этой статьи последняя версия - 3.1.1, но она не поддерживается. пока (7 апреля):

Мы обновили Spark до 3.1.1 в основной ветке. Мы все еще работаем над некоторыми элементами перед выпуском релиза.

Согласно официальной документации:

Запустите искровую оболочку с пакетом Delta Lake:

bin/spark-shell --packages io.delta:delta-core_2.11:0.8.0

От себя лично использую --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension для включения команд SQL Delta Lake, например DESCRIBE DETAIL, GENERATE.

Вся команда для запуска spark-shell с Delta Lake 0.8.0 должна быть следующей:

./bin/spark-shell \
  --packages io.delta:delta-core_2.12:0.8.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
person Jacek Laskowski    schedule 29.12.2019
comment
Обнаружил проблему: не удалось записать DF как паркетный файл. с зависимостями smae и версией spak, любая идея почему? - person Raptor0009; 30.12.2019
comment
Пожалуйста, начните с самой чистой искровой оболочки, т. Е. Начните ее с --packages только для delta-core. Иначе, думаю, вам стоит задать отдельный вопрос. - person Jacek Laskowski; 30.12.2019
comment
@JacekLaskowski: пожалуйста, проверьте это один раз, не работает с толстой банкой. stackoverflow.com/ questions / 60166804 / - person Rao; 12.02.2020

Сам Spark зависит от Джексона, и версия, которую вы инструктируете использовать Spark-Shell, несовместима. Согласно https://github.com/apache/spark/blob/v2.4.0/pom.xml, 2.4.0 использует Jackson 2.6.7. Есть ли особая причина, по которой вам нужен Jackson 2.10 в этом случае?

person Charlie Flowers    schedule 28.12.2019
comment
Я добавил 2.10 из-за интеграции Hbase. Так следует ли мне использовать 2.6.7 jackson вместо 2.10? - person Raptor0009; 29.12.2019
comment
Удалил эти зависимости и запустил, все та же проблема. Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z - person Raptor0009; 29.12.2019
comment
Пробовал использовать все версии 2_11 json4 на этой странице. mvnrepository.com/artifact/org .json4s / json4s-core_2.11. - person Raptor0009; 29.12.2019

Проверьте здесь: https://docs.delta.io/0.8.0/quick-start.html

Чтобы запустить искровую оболочку, используйте команду

spark-shell --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
person Galuoises    schedule 12.04.2021