Последний конвейер агрегации для всех отдельных идентификаторов очень медленный, нужно создавать правильные индексы?

Учитывая, что следующий код конвейера агрегации возвращает самую новую запись для всех отдельных «внутренних_идентификаторов»:

db.locations.aggregate({$sort: {timestamp: -1}}, {$group: {_id: "$internal_id", doc: {$first: "$$ROOT"}}})

Этот вызов занимает до 10 секунд, что неприемлемо. Коллекция не такая уж большая:

db.locations.count()
1513671

Итак, я предполагаю, что что-то не так с индексами, однако я пытался создать много индексов, и ни один из них не улучшился, в настоящее время я оставил те два, которых должно было быть достаточно имхо: {timestamp: -1, internal_id: 1} и { внутренний_ид: 1, отметка времени: -1}.

MongoDB НЕ сегментирована и работает с набором реплик из 3 хостов под управлением версии 3.6.14.

Журнал MongoDB показывает следующее:

2020-05-30T12:21:18.598+0200 I COMMAND  [conn12652918] command mydb.locations appName: "MongoDB Shell" command: aggregate { aggregate: "locations", pipeline: [ { $sort: { timestamp: -1.0 } }, { $group: { _id: "$internal_id", doc: { $first: "$$ROOT" } } } ], cursor: {}, lsid: { id: UUID("70fea740-9665-4068-a2b5-b7b0f10dcde9") }, $clusterTime: { clusterTime: Timestamp(1590834060, 34), signature: { hash: BinData(0, 9DFB6DBCEE52CFA3A5832DC209519A8E9D6F1204), keyId: 6783976096153993217 } }, $db: "mydb" } planSummary: IXSCAN { timestamp: -1, ms_id: 1 } cursorid:8337712045451536023 keysExamined:1513708 docsExamined:1513708 numYields:11838 nreturned:101 reslen:36699 locks:{ Global: { acquireCount: { r: 24560 } }, Database: { acquireCount: { r: 12280 } }, Collection: { acquireCount: { r: 12280 } } } protocol:op_msg 7677msms

person Adam C.    schedule 30.05.2020    source источник


Ответы (2)


Агрегации Mongo теоретически являются описательными (в том смысле, что вы описываете, что вы хотите, чтобы произошло, а оптимизатор запросов определяет эффективный способ выполнения этого расчета), но на практике многие агрегации оказываются процедурными и неоптимизированными. Если вы посмотрите на инструкции по процедурной агрегации:

  1. {$sort: {timestamp: -1}}: сортировать все документы по отметке времени.
  2. {$group: {_id: "$internal_id", doc: {$first: "$$ROOT"}}: просмотрите эти документы, отсортированные по временным меткам, а затем сгруппируйте их по идентификатору. Поскольку на данный момент все отсортировано по отметке времени (а не по идентификатору), в конечном итоге это будет приличный объем работы.

Вы можете увидеть, что на самом деле делает монго, взглянув на план запроса этой строки журнала: planSummary IXSCAN { timestamp: -1, ms_id: 1 }.

Вы хотите заставить монго придумать лучший план запроса, чем тот, который использует индекс {internal_id: 1, timestamp: -1}. Предоставление ему подсказки использовать этот индекс может сработать — это зависит от насколько хорошо он может рассчитать план запроса.

Если предоставление этой подсказки не работает, одним из вариантов может быть разбить этот запрос на 2 части, каждая из которых использует соответствующий индекс.

  1. Найдите максимальную отметку времени для каждого internal_id. db.my_collection.aggregate([{$group: {_id: "$internal_id", timestamp: {$max: "$timestamp"}}}]). Это должно использовать индекс {internal_id: 1, timestamp: -1}.
  2. Используйте эти результаты, чтобы найти документы, которые вам действительно нужны: db.my_collection.find({$or: [{internal_id, timestamp}, {other_internal_id, other_timestamp}, ....]}) (если для одного и того же internal_id есть повторяющиеся временные метки, вам может потребоваться дедупликация).

Если вы хотите объединить эти 2 части в 1, вы можете использовать самообъединение исходной коллекции с $lookup.

person willis    schedule 31.05.2020
comment
Большое спасибо! Я протестирую различные решения и сообщу об измерении скорости - person Adam C.; 01.06.2020
comment
Попробовал сегодня, к сожалению, кажется, что асинхронный драйвер Python не может передать подсказку, несмотря на то, что написано в его документации. Я спросил об этом выше по течению. Дам тебе знать. - person Adam C.; 02.06.2020
comment
Это облом! Я думаю, что выполнение двух запросов (без подсказки) является более вероятным решением, и вы можете адаптировать этот шаблон, чтобы он был немного быстрее с самосоединением, если вам нужно - person willis; 03.06.2020

Итак, наконец, я смог провести все тесты, вот вся версия, которую я написал, благодаря ответу Уиллиса и результату:

Исходный агрегированный запрос

mongo_query = [
  {"$match": group_filter},
  {"$sort": {"timestamp": -1}},
  {"$group": {"_id": "$internal_id", "doc": {"$first": "$$ROOT"}}},
]

res = mongo.db[self.factory.config.mongo_collection].aggregate(mongo_query)
res = await res.to_list(None)

9,61 секунды

Дайте MongoDB подсказку для использования правильного индекса (сначала отфильтруйте internal_id)

from bson.son import SON

cursor = mongo.db[self.factory.config.mongo_collection].aggregate(mongo_query, hint=SON([("internal_id", 1), ("timestamp", -1)]))
res = await cursor.to_list(None)

Не работает, MongoDB отвечает с исключением, говоря, что сортировка потребляет слишком много памяти

Разделить агрегацию, чтобы сначала найти последнюю временную метку для каждого internal_id

cursor = mongo.db[self.factory.config.mongo_collection].aggregate([{"$group": {"_id": "$internal_id", "timestamp": {"$max": "$timestamp"}}}])
res = await cursor.to_list(None)

or_query = []
for entry in res:
    or_query.append({"internal_id": entry["_id"], "timestamp": entry["timestamp"]})
cursor = mongo.db[self.factory.config.mongo_collection].find({"$or": or_query})
fixed_res = await cursor.to_list(None)

1,88 секунды, намного лучше, но все же не так быстро

Параллельные сопрограммы (и победителем становится....)

Тем временем, поскольку у меня уже есть список internal_id и я использую асинхронный Python, я выбрал параллельную сопрограмму, получая сразу последнюю запись для одного internal_id:

fixed_res: List[Dict] = []

async def get_one_result(db_filter: Dict) -> None:
    """ Coroutine getting one result for each known internal ID """

    cursor = mongo.db[self.factory.config.mongo_collection].find(db_filter).sort("timestamp", -1).limit(1)
    res = await cursor.to_list(1)
    if res:
        fixed_res.append(res[0])

coros: List[Awaitable] = []
for internal_id in self.list_of_internal_ids:
    coro = get_one_result({"internal_id": internal_id})
    coros.append(coro)
await asyncio.gather(*coros)

0,5 с, намного лучше, чем другие

Если у вас нет списка internal_id

Есть альтернатива, которую я не реализовал, но я подтвердил, что вызов очень быстрый: используйте низкоуровневую команду distinct для индекса {internal_id: 1} для получения списка отдельных идентификаторов, а затем используйте параллельные вызовы.

person Adam C.    schedule 02.06.2020