Запись данных в SQL DW из Apache Spark в Azure Synapse

Когда я записываю данные в SQL DW в Azure из Databricks, я использую следующий код:

example1.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable12").option("forward_spark_azure_storage_credentials","True") .option("tempdir", tempDir).mode("overwrite").save()

Это не будет работать с Notebook в Synapse Notebook. Я получаю сообщение об ошибке:

Py4JJavaError: An error occurred while calling o174.save.
: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.sqldw. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656) Caused by: java.lang.ClassNotFoundException: com.databricks.spark.sqldw.DefaultSource

По сути, мне нужно знать эквивалент com.databricks.spark.sqldw для Apache Spark в Azure Synapse.

Спасибо


person Patterson    schedule 09.06.2021    source источник


Ответы (1)


Если вы пишете в выделенный пул SQL в той же рабочей области Synapse, что и записная книжка, то это так же просто, как вызвать метод synapsesql. Простой параметризованный пример в Scala с использованием функции ячейки параметров записных книжек Synapse.

// Read the table
val df = spark.read.synapsesql(s"${pDatabaseName}.${pSchemaName}.${pTableName}")

// do some processing ...

// Write it back with _processed suffixed to the table name
df.write.synapsesql(s"${pDatabaseName}.${pSchemaName}.${pTableName}_processed", Constants.INTERNAL)

Если вы пытаетесь писать из записной книжки в другой выделенный пул SQL или в старое хранилище данных SQL Azure, то все немного по-другому, но есть отличные примеры здесь.

ОБНОВЛЕНИЕ: элементы в фигурных скобках со знаком доллара (например, ${pDatabaseName}) являются параметрами. Вы можете назначить ячейку параметра в записной книжке, чтобы параметры можно было передавать извне, например, из фабрики данных Azure (ADF) или конвейеров Synapse, используя действие «Выполнить записную книжку», и повторно использовать в записной книжке, как в моем примере выше. Узнайте больше о параметрах Synapse Notebook здесь.

Параметры блокнота Synapse

person wBob    schedule 09.06.2021
comment
спасибо, что остались со мной в этом. Я добавил следующее, но получаю сообщение об ошибке «Неполная инструкция» %%spark val pDatabaseName = "MyFirstSQLPool" val pSchemaName = "dbo" val pTableName = "mysampletable3" val df.write.synapsesql(s"${pDatabaseName}.${pSchemaName}.${pTableName}_processed", Constants.INTERNAL) - person Patterson; 09.06.2021
comment
Я бы сделал это так, чтобы у меня было две ячейки, в ячейке 1 - три параметра, и я должен был убедиться, что эта ячейка обозначена как ячейка с параметрами. Вы можете сделать это, щелкнув в правом верхнем углу ячейки. Затем есть ячейка 2, которая использует эти параметры. Есть смысл? - person wBob; 09.06.2021
comment
Кроме того, где в вашем коде элемент, заполняющий фрейм данных (df)? - person wBob; 09.06.2021
comment
Доброе утро @wBob. Я не совсем понимаю, что вы имеете в виду. Код, заполняющий df: test = spark.sql (выберите * из testtable) - person Patterson; 10.06.2021
comment
Я изменил код, чтобы заполнить фрейм данных: %%spark val scala_df = spark.sqlContext.sql ("select * from stgProbateGrantCaseAll") scala_df.write.synapsesql("sqlpool.dbo.PySparkTable", Constants.INTERNAL) - person Patterson; 10.06.2021
comment
Я получаю сообщение об ошибке Error: java.lang.IllegalArgumentException: Can't get JDBC type for struct<caseType:string,applicationType:string,boDocumentsUploaded:array<string>,applicationSubmittedDate:string,declaration:struct<:string,accept:string,confirm:string,confirmItem1:string,confirmItem2:string,confirmItem3:string,requests:string - person Patterson; 10.06.2021
comment
ваше оригинальное решение сработало как мечта. Проблема заключалась в том, что выделенный пул SQL был приостановлен. - person Patterson; 10.06.2021
comment
lol рад, что ты добрался до конца. - person wBob; 10.06.2021
comment
о, и последнее, не могли бы вы сообщить мне, как добавить опцию «перезаписывать» таблицы в базе данных, пожалуйста. - person Patterson; 10.06.2021
comment
Насколько мне известно, в настоящее время вы не можете. Вот почему в моем примере я передаю таблицу обратно с суффиксом _processed и позволяю Synapse Pipelines обрабатывать своп впоследствии с помощью операции хранимой процедуры. - person wBob; 10.06.2021
comment
Это не идеально. С помощью Databricks вы можете обновить или перезаписать ... не очень хорошо. Спасибо хоть - person Patterson; 10.06.2021