Как эффективно читать данные из mongodb и преобразовывать их в фрейм данных искры?

Я уже много исследовал, но не смог найти решение. Ближайший вопрос, который я смог найти здесь: Почему мой SPARK работает очень медленно с mongoDB< /а>.

Я пытаюсь загрузить коллекцию mongodb в DataFrame искры, используя соединитель mongo-hadoop. Вот фрагмент соответствующего кода:

connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name)
trainrdd = sc.mongoRDD(connection_string, config=config)
#     traindf = sqlcontext.createDataFrame(trainrdd)
#     traindf = sqlcontext.read.json(trainrdd)
traindf = sqlcontext.jsonRDD(trainrdd) 

Здесь «sc» — это объект SparkContext. Я также пробовал варианты, которые закомментированы в коде. Но все одинаково медленные. Для коллекции размером 2 ГБ (100 000 строк и 1000 столбцов) требуется около 6 часов (святой моли:/) на кластере из 3 машин, каждая с 12 ядрами и 72 ГБ ОЗУ (используя все ядра в этом искровом кластере). Сервер MongoDB также работает на одной из этих машин.

Я не уверен, правильно ли я это делаю. Любые указатели на то, как оптимизировать этот код, были бы очень полезны.


person bitspersecond    schedule 20.04.2016    source источник
comment
Вы имеете в виду медленное использование jsonRDD? Можете ли вы попробовать преобразовать RDD в DataFrame другими способами?   -  person Wan Bachtiar    schedule 04.05.2016
comment
Привет Ван Спасибо за ответ. Да, фактическое действие начинается, когда вы вызываете 'sqlcontext.jsonRDD(trainrdd)'. Это запускает чтение mongodb, при этом в журналах mongodb указывается, что соединения устанавливаются и сбрасываются. Я пробовал другие методы (закомментированные в коде выше), которые одинаково медленны. Недавно я попробовал sqlcontext.read.json в файле json, экспортированном из коллекции mongodb. Это работало довольно быстро сравнительно.   -  person bitspersecond    schedule 09.05.2016
comment
Какая версия jar файла mongodb mongo- соединитель искры Hadoop вы используете? Можете ли вы попробовать отделить сервер MongoDB от узлов Spark?   -  person Wan Bachtiar    schedule 10.05.2016
comment
хорошо, я попробую разделить сервер MongoDB и опубликую его здесь. Я использую spark-1.6.1-bin-hadoop2.6 и mongo-hadoop 1.5.0.   -  person bitspersecond    schedule 10.05.2016


Ответы (2)


Эффективным способом чтения данных из монго с помощью pyspark является использование MongoDb искровой разъем

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext()
spark = SparkSession(sc)
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").load()

И это будет искровой фрейм данных, его не нужно конвертировать. Вам просто нужно настроить искровой коннектор mongodb.

Если вы используете блокнот, напишите это вверху-

 %%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}

Если вы используете команду spark-submit:

spark-submit --conf spark.pyspark.python=/usr/bin/anaconda/envs/py35/bin/python3.5 --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 file_name.py

Если вы хотите записать его обратно в mangoDB, попробуйте:

data.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.input.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").save()
person Kriti Pawar    schedule 18.05.2020
comment
Не могли бы вы также помочь, как записать данные в MongoDB с помощью pyspak? Ваше предложение выше помогло мне прочитать из mongodb, хотя у меня также есть требование обновить коллекцию из pyspakr. Пожалуйста, предложите - person Vaibhav; 10.05.2021
comment
Пожалуйста, проверьте сейчас. и если у вас есть новый фрейм данных для создания, проверьте stackoverflow.com/questions/43316716/ - person Kriti Pawar; 10.05.2021

По умолчанию pyspark.sql.SQLContext .jsonRDD будет динамически определять схему данного набора данных JSON. Столбцы будут добавляться по мере нахождения новых полей JSON. Это может быть медленным, поскольку проверяется каждый атрибут JSON. Особенно, если у вас 1000 столбцов.

Что вы могли бы сделать, так это явно определить схему, учитывая, что данные известны или требуется только определенный набор полей.

Кроме того, из-за ObjectId проблемы, описанной в HADOOP-277, вам необходимо либо удалить поля, содержащие такие несовместимые типы или преобразовать в другие типы. то есть str(ObjectId(...))

Например :

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark 
pymongo_spark.activate()
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection")
sqlcontext = SQLContext(sc)

# Define your schema explicitly
schema = StructType([StructField("firstname", StringType()),
                     StructField("lastname", StringType()),
                     StructField("description", StringType())])

# Create a mapper function to return only the fields wanted, or to convert. 
def project(doc):
    return {"firstname": str(doc["firstname"]), 
            "lastname": str(doc["lastname"]), 
            "description": str(doc["description"])}

projected_rdd = data_rdd.map(project)
train_df = sqlcontext.jsonRDD(projected_rdd, schema)
train_df.first()

Приведенный выше фрагмент был протестирован в среде: Spark v1.6.1, mongo-hadoop spark v1.5.2

person Wan Bachtiar    schedule 11.05.2016
comment
Привет, Ван, полезно знать и об использовании схемы. Я попробовал, и для 100 столбцов это не имело никакого значения. Я попробую это и для 1000 столбцов и опубликую результаты здесь. Спасибо. - person bitspersecond; 13.05.2016