Spark DataFrame InsertIntoJDBC — исключение TableAlreadyExists

Используя Spark 1.4.0, я пытаюсь вставить данные из Spark DataFrame в базу данных MemSQL (что должно быть точно таким же, как взаимодействие с базой данных MySQL), используя insertIntoJdbc(). Однако я продолжаю получать исключение Runtime TableAlreadyExists.

Сначала я создаю таблицу MemSQL следующим образом:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

Затем я создаю простой фрейм данных в Spark и пытаюсь вставить его в MemSQL следующим образом:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.

person DJElbow    schedule 02.10.2015    source источник


Ответы (3)


Это решение применимо к общим соединениям JDBC, хотя ответ @wayne, вероятно, является лучшим решением конкретно для memSQL.

ВставкаIntoJdbc, похоже, устарела с версии 1.4.0, и ее использование фактически вызывает write.jdbc().

write() возвращает объект DataFrameWriter. Если вы хотите добавить данные в свою таблицу, вам придется изменить режим сохранения объекта на "append".

Еще одна проблема с примером в вопросе выше заключается в том, что схема DataFrame не соответствует схеме целевой таблицы.

Код ниже дает рабочий пример из оболочки Spark. Я использую spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar для запуска сеанса искровой оболочки.

import java.util.Properties

val prop = new Properties() 
prop.put("user", "root")
prop.put("password", "")  

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
person DJElbow    schedule 02.10.2015
comment
Привет, Elbow, я использую spark 1.5, и я все еще получаю исключение table уже существует даже после того, как говорю write.mode (append). Вы хотите прокомментировать это? В базе данных уже есть объект с именем «customer_spark». - person sri hari kali charan Tummala; 09.12.2015
comment
Эй, @DJElbow, то же самое здесь, все еще получаю исключение Table 'table1' уже существует. когда write.mode(SaveMode.Append). Я проверил, и при использовании пользователя «root» он отлично работает, но при использовании пользователя с привилегиями CREATE/INSERT/UPDATE я получаю эту ошибку. - person marnun; 05.02.2017

Документы insertIntoJDBC на самом деле неверны; они говорят, что таблица уже должна существовать, но на самом деле, если она существует, она выдаст ошибку, как вы можете видеть выше:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

Мы рекомендуем использовать наш коннектор MemSQL Spark, который вы можете найти здесь:

https://github.com/memsql/memsql-spark-connector

Если вы включите эту библиотеку и импортируете com.memsql.spark.connector._ в свой код, вы можете использовать df.saveToMemSQL(...) для сохранения вашего DataFrame в MemSQL. Вы можете найти документацию для нашего коннектора здесь:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

person Wayne Song    schedule 02.10.2015
comment
Очень хорошо. Это упрощает дело. Есть ли где-нибудь скомпилированная банка для скачивания? Проблемы с поиском. - person DJElbow; 03.10.2015
comment
Если вы добавите maven.memsql.com в качестве преобразователя, вы можете включить его в свой проект как зависимость: github.com/memsql/memsql-spark-connector#using - person Wayne Song; 03.10.2015

У меня была такая же проблема. Обновление искровой версии до 1.6.2 работало нормально

person Dinesh Parmar    schedule 21.09.2016