Spark CosmosDB Sink: org.apache.spark.sql.AnalysisException: «запись» не может быть вызвана для потокового набора данных/DataFrame

Я читаю поток данных из концентратора событий в Spark (используя Databricks). Моя цель — иметь возможность записывать потоковые данные в CosmosDB. Однако я получаю следующую ошибку: org.apache.spark.sql.AnalysisException: «запись» не может быть вызвана для потокового набора данных/DataFrame.

Этот сценарий не поддерживается?

Версии искры: 2.2.0 и 2.3.0

Используемые библиотеки:

  • json-20140107
  • rxnetty-0.4.20
  • azure-documentdb-1.14.0
  • azure-documentdb-rx-0.9.0-rc2
  • azure-cosmosdb-spark_2.2.0_2.11-1.0.0
  • rxjava-1.3.0
  • azure-eventhubs-databricks_2.11-3.4.0

Мой код:

import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming._
import com.microsoft.azure.cosmosdb.spark.config._

import org.apache.spark.sql.SparkSession
import org.apache.spark.eventhubs.common.EventHubsUtils
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger

val sparkConfiguration = EventHubsUtils.initializeSparkStreamingConfigurations

sparkConfiguration.setAppName("MeetupStructuredStreaming")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.allowBatching", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.batchingTimeout", "60000")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.stopGracefullyOnShutdown", "true")

val spark = SparkSession.builder().config(sparkConfiguration).getOrCreate()

val configMap = Map(
  "Endpoint" -> "https://xxx.documents.azure.com:443/",
  "Masterkey" -> "xxxx",
  "Database" -> "meetup",
  "Collection" -> "event"
)

val ehStream = spark.readStream  
  .format("eventhubs")
  .option("eventhubs.policyname", "xxx")
  .option("eventhubs.policykey", "xxx")
  .option("eventhubs.namespace", "xx")
  .option("eventhubs.name", "meetup")
  .option("eventhubs.partition.count", "2")
  .option("eventhubs.maxRate", "100")
  .option("eventhubs.consumergroup", "streaming")
  .option("eventhubs.progressTrackingDir", "/tmp/eventhub-progress/meetup-events")
  .option("eventhubs.sql.containsProperties", "true")        
  .load
  .select(
    from_unixtime(col("enqueuedTime").cast(LongType)).alias("enqueuedTime")
    , from_unixtime(get_json_object(col("body").cast(StringType), "$.mtime").divide(1000)).alias("time")
    , get_json_object(col("body").cast(StringType), "$.name").alias("name")
    , get_json_object(col("body").cast(StringType), "$.event_url").alias("url")
    , get_json_object(col("body").cast(StringType), "$.status").alias("status")
    , get_json_object(col("body").cast(StringType), "$.venue.country").alias("country")
    , get_json_object(col("body").cast(StringType), "$.venue.city").alias("city")
    , get_json_object(col("body").cast(StringType), "$.group.category.shortname").alias("category"))

var cosmosDbStreamWriter = ehStream
  .writeStream
  .outputMode("append")
  .format(classOf[CosmosDBSinkProvider].getName).options(configMap)  
  .option("checkpointLocation", "/tmp/streamingCheckpoint")
  .trigger(Trigger.ProcessingTime(1000 * 3)) // every 3 seconds  
  .start()

person fbeltrao    schedule 17.01.2018    source источник
comment
Спасибо @fbeltrao за добавление этого в репозиторий azure-cosmosdb-spark. Рассмотрит там и ответит на SO позже.   -  person Denny Lee    schedule 18.01.2018