Google Dataflow: запись в хранилище данных без перезаписи существующих объектов

TL; DR: поиск способа обновления сущностей хранилища данных без перезаписи существующих данных через поток данных

Я использую поток данных 2.0.0 (луч) для обновления объектов в Google Datastore. Мой поток данных загружает объекты из хранилища данных, обновляет их, а затем сохраняет их обратно в хранилище данных (перезаписывая существующие объекты).

Однако в процессе обновления я также обнаруживаю дополнительные сущности, которые могут существовать, а могут и не существовать. Чтобы предотвратить перезапись существующих сущностей, я раньше загружал все сущности из Datastore и уменьшал их (группируя по ключу), удаляя новые дубликаты.

По мере роста количества сущностей я хочу избежать загрузки всех сущностей в поток данных (вместо того, чтобы брать их партиями на основе самых старых временных меток), но я сталкиваюсь с проблемой, что старые сущности перезаписываются, когда они не находятся в текущая партия.


Я пишу объекты в поток данных, используя (в двух местах, одно для существующих объектов, другое для новых объектов):

collection.apply(DatastoreIO.v1().write().withProjectId("..."))

Было бы здорово, если бы существовал метод DatastoreIO.v1().writeNew(), но, к сожалению, его не существует. Спасибо за любую помощь.


person Mingwei Samuel    schedule 09.06.2017    source источник
comment
Мне непонятно, что вы пытаетесь сделать, поскольку в одном случае вы говорите, что хотите перезаписать объекты, а также предотвратить перезапись объектов. Вы можете уточнить? Кроме того, как вы загружаете свои объекты партиями?   -  person Vikas Kedigehalli    schedule 12.06.2017
comment
Да, мы полностью должны предоставить это через Dataflow. Сервис поддерживает Insert, Upsert и Update, но в настоящее время мы предоставляем Upsert только в Dataflow.   -  person Dan McGrath    schedule 12.06.2017
comment
Семантика неидемпотентной записи требует дополнительных размышлений о том, как обрабатываются повторные попытки, учитывая, что поток данных продолжает повторять попытки до тех пор, пока каждая запись не будет успешно записана. Мы можем оказаться в сценарии, когда некоторые операции записи выполнены успешно, но задание потока данных не выполнено, и любые последующие попытки выполнения задания никогда не увенчаются успехом.   -  person Vikas Kedigehalli    schedule 13.06.2017
comment
@VikasKedigehalli Пакетирует в основном путем выбора сущностей старше X времени ИЛИ выбора Y самых старых сущностей. И да, я в основном хочу делать обновления сущностей вместо перезаписи / апсертов.   -  person Mingwei Samuel    schedule 13.06.2017
comment
@MingweiSamuel. Не могли бы вы пояснить, что вы имеете в виду под обновлением объектов хранилища данных без перезаписи существующих данных? Согласно документации хранилища данных cloud.google.com/datastore/docs/concepts/, обновление объекта - это, по сути, перезапись.   -  person Vikas Kedigehalli    schedule 14.06.2017
comment
Обновление как при перезаписи определенных свойств без удаления существующих свойств   -  person Mingwei Samuel    schedule 15.06.2017
comment
На самом деле это то, что мне очень хотелось бы, но по этому я просто ищу обновление, которое записывает только новые сущности и не перезаписывает существующие (по ключу)   -  person Mingwei Samuel    schedule 15.06.2017


Ответы (1)


Если вы хотите написать новую сущность, которой нет в Datastore, вы просто создаете ее с новым ключом и записываете.

List<String> keyNames = Arrays.asList("L1", "L2"); // Somewhat you have new keys to store
PTransform<PCollection<Entity>, ?> write =
        DatastoreIO.v1().write().withProjectId(project_id); // This is a typical write operation

p.
    apply("GetInMemory", Create.of(keyNames)).setCoder(StringUtf8Coder.of()). // L1 and L2 are loaded
    apply("Proc1", ParDo.of(new DoFn<String, Entity>(){
        @ProcessElement
        public void processElement(ProcessContext c) {
            Key.Builder key = makeKey("k2", c.element());  // Generate an entity key
            final Entity entity = Entity.newBuilder().
                    setKey(key). // Set the key
                    putProperties("p1", makeValue(new String("test constant value")
                        ).setExcludeFromIndexes(true).build()).
                    build();
            c.output(entity);
        }
    })).
    apply(write); // Write them
p.run();

Весь код можно найти в моем репозитории кода по адресу https://github.com/yiu31802/gcp-project/commit/cc224b34.

person Junji Shimagaki    schedule 05.01.2018
comment
есть ли способ создать автоматически выделенный ключ / идентификатор при создании новой сущности? - person SylvesterTheKid; 27.05.2019