Как можно использовать лямбда для синхронизации DynamoDB и Cloud Search

Предположим, мы используем триггеры AWS в таблице DynamoDB, и этот триггер должен запустить лямбда-функцию, задача которой - обновить запись в CloudSearch (чтобы синхронизировать DynamoDB и CS).

Я не очень понимаю, как Lambda всегда синхронизирует данные с данными в DynamoDB. Рассмотрим следующий поток:

  1. Приложение обновляет запись A таблицы DynamoDB (скажем, до A1)
  2. Очень близко после этого приложение обновляет ту же запись A той же таблицы (до A2)
  3. Триггер для 1 заставляет лямбда, равную 1, начать выполнение
  4. Триггер для 2 заставляет лямбду 2 начать выполнение
  5. Шаг 4 выполняется первым, поэтому CloudSearch видит A2
  6. Шаг 3 завершен, поэтому CloudSearch видит A1.

Не гарантируется, что лямбда-триггеры будут запускаться ТОЛЬКО после завершения предыдущего вызова (исправьте, если нет, и предоставьте мне ссылку)

Как видим, дело не в синхронизации.

Самое близкое, что я могу придумать, - это использовать AWS Kinesis Streams, но и те, что тоже с одним Shard (ограничение в 1 МБ в ps). Если это ограничение работает, то ваше потребительское приложение может быть написано таким образом, чтобы запись сначала обрабатывалась последовательно, то есть только после того, как предыдущая запись была помещена в CS, должна быть обработана следующая запись. Предполагая, что вышеупомянутое утверждение верно, как обеспечить правильную синхронизацию, если в DynamoDB поступает так много данных, что в Kinesis требуется более одного осколка?


person Ouroboros    schedule 05.08.2017    source источник


Ответы (2)


Вы можете добиться этого с помощью DynamoDB Streams:

Потоки DynamoDB

«Поток DynamoDB - это упорядоченный поток информации об изменениях элементов в таблице Amazon DynamoDB».

DynamoDB Streams гарантирует следующее:

  • Каждая запись потока появляется в потоке ровно один раз.
  • Для каждого элемента, измененного в таблице DynamoDB, записи потока отображаются в той же последовательности, что и фактические изменения элемента.

Еще одна интересная особенность DynamoDB Streams: если ваша Lambda не может обработать поток (например, любая ошибка при индексировании в облачном поиске), событие будет продолжать повторять попытки, а другие потоки записей будут ждать, пока ваш контекст не завершится успешно.

Мы используем Streams, чтобы наши индексы Elastic Search синхронизировались с нашими таблицами DynamoDB.

person Tom Melo    schedule 07.08.2017
comment
Да, Dynamo Stream делает то, что мы хотим, но что, если количество сегментов превышает 1 (из-за большого количества обновлений данных)? В этом случае ваши записи будут разделены на два разных сегмента. Таким образом, предположение о последовательном упорядочивании было бы неверным. - person Ouroboros; 07.08.2017
comment
Я думаю, вы можете справиться с этой ситуацией, используя DynamoDB Streams Kinesis Adapter Я думаю, что ключевым моментом здесь для подтверждения того, что говорится в документации, является тестирование. Напишите свою лямбда-функцию, вызовите ошибку, увидит, будет ли запущено другое событие, или будет ждать, пока ваша предыдущая запись не будет успешно обработана. Кроме того, попробуйте воспроизвести большой объем данных, вставляемых или обновляемых в вашу таблицу, независимо от того, выбираете ли вы потоки Dynamodb или нет, вам придется протестировать свой сценарий с любым решением, которое вы выберете. - person Tom Melo; 07.08.2017
comment
Том, в тот момент, когда мы получим несколько осколков в изображении, каждый осколок будет иметь упорядоченную запись (в пределах записей этого осколка), но не по осколкам. Итак, мой вопрос на самом деле заключается в том, с помощью Kinesis (или потоков DynamoDB, или даже Kafka, если на то пошло), как мы можем гарантировать получение глобально упорядоченных записей, даже если существует несколько сегментов? - person Ouroboros; 09.08.2017
comment
Я рад, что вы нашли ответ, я углублюсь в детали, у нас может быть такая же ситуация! - person Tom Melo; 09.08.2017
comment
Том, пока вы ожидаете, что все записи одного первичного ключа (раздел + диапазон) будут обрабатываться в правильном порядке, это не имеет значения. Однако, если ваша обработка требует глобального упорядочивания, ничто не может помочь. Если вы используете Dynamo Streams + Lambda, чтобы просто синхронизировать эластичный поиск, тогда у вас все хорошо (как описано в моем ответе). Однако, если вы также делаете что-то другое, что требует правильного глобального упорядочивания, и ваша скорость приема превышает 1 МБ / с, то вам не повезло. - person Ouroboros; 10.08.2017

Ссылка на AWS Lambda F&Q

Вопрос: Как AWS Lambda обрабатывает данные из потоков Amazon Kinesis и Amazon DynamoDB Streams?

Записи Amazon Kinesis и DynamoDB Streams, отправляемые в вашу функцию AWS Lambda, строго сериализуются для каждого сегмента. Это означает, что если вы поместите две записи в один и тот же осколок, Lambda гарантирует, что ваша функция Lambda будет успешно вызвана с первой записью до того, как она будет вызвана со второй записью. Если время вызова одной записи истекло, было ограничено или возникла какая-либо другая ошибка, Lambda будет повторять попытки, пока не завершится успешно (или запись не истечет 24 часа), прежде чем перейти к следующей записи. Порядок записей в разных шардах не гарантируется, и обработка каждого шарда происходит параллельно.

Это означает, что Lambda будет выбирать записи в одном осколке одну за другой, чтобы они появлялись в осколке, и не будет выполнять новую запись до тех пор, пока не будет обработана предыдущая запись!

Однако остается еще одна проблема: что, если записи одной и той же записи присутствуют в разных шардах? К счастью, AWS DynamoDB Streams гарантирует, что первичный ключ всегда находится только в определенном Shard. (По сути, я думаю, что первичный ключ - это то, что используется для поиска хэша, указывающего на осколок) AWS Slide Link. См. Дополнительные в блоге AWS ниже:

Относительный порядок последовательности изменений, внесенных в один первичный ключ, будет сохранен в сегменте. Кроме того, данный ключ будет присутствовать не более чем в одном из набора одноуровневых шардов, которые активны в данный момент времени. В результате ваш код может просто обрабатывать записи потока в сегменте, чтобы точно отслеживать изменения элемента.

person Ouroboros    schedule 09.08.2017
comment
есть ли у вас какие-либо ограничения на использование потоков DynamoDB для обновления облачного поиска? Я прочитал документацию, и в ней говорится, что вы можете запросить только 1 пакет / 10 секунд. Что, если обновление в Dynamodb происходит менее чем за 10 секунд? - person kkesley; 01.10.2018
comment
Это не имеет значения. Ограничение в 10 секунд означает, что вы не можете сделать несколько запросов за это время. Но вы всегда получите правильные данные, если API будут созданы в этих пределах. Поэтому, даже если ваши обновления происходят за миллисекунды, вам всегда нужно проверять изменения с T1 на T2. Таким образом, вам нужно написать код потребителя так, чтобы он спал, скажем, 12 секунд (все, что больше 10, нормально), а затем запускал API для получения данных от последнего вызванного времени окончания до текущего времени. Даже если текущее время - время окончания последнего вызова больше 10 секунд, API будет правильно отправлять результаты. - person Ouroboros; 03.10.2018