TL;DR: теперь пользователи SceneBox могут использовать StreamableSets для потоковой передачи тщательно подобранных наборов данных непосредственно из облака SceneBox в обучающие конвейеры PyTorch — молниеносно быстро!

Алгоритмы машинного обучения стали повсеместными в большинстве современных технологических продуктов. Каждый день крутится куча этих алгоритмов. Чтобы кормить этого зверя машинного обучения, данные собираются в порядке петабайтов. Именно в этой области процветает SceneBox благодаря таким функциям, как высокопроизводительное индексирование любых данных без схемы и мощные инструменты управления данными.

На сегодняшний день мы с гордостью представляем StreamableSets для пакетной загрузки данных несколькими работниками из любого удаленного хранилища для передачи ваших заданий PyTorch. StreamableSets — это простая замена объектов загрузчика данных в сценариях PyTorch. Несколько строк кода, и SceneBox легко интегрируется с вашими уже существующими конвейерами обучения. В сочетании с нашим адаптивным веб-приложением StreamableSets становится мощным инструментом в вашем арсенале.

Ниже мы рассмотрим, как StreamableSets влияет на вашу повседневную жизнь инженера по машинному обучению и помогает вам узнать о своих данных до того, как это сделает ваша модель машинного обучения.

Итерация — ключ к машинному обучению

Типичный рабочий процесс машинного обучения состоит из четырех этапов:

  1. Курирование данных
  2. Обучение и проверка алгоритма машинного обучения
  3. Полевые испытания/показатели производительности
  4. Развертывание

Мы повторяем эти шаги, экспериментируя с различными наборами данных и алгоритмами и устраняя пробелы в производительности в полевых условиях.

Хотя мы могли бы получить детализацию очень быстро, ради этой статьи мы остановимся только на первом шаге — Курирование данных. Первый шаг можно разделить на очень сложные процессы, над которыми кропотливо работают целые команды инженеров данных, используя все, кроме своих сценариев Python. Вот шаги, которые команды предпринимают слишком часто:

  1. Индексирование данных. Необработанные данные (изображения, видео, лидары) обычно хранятся в хранилище объектов и метаданных в каком-либо варианте базы данных.
    Метод SceneBox. Наш высокопроизводительный индекс может принимать ваши данные со средней скоростью 4000 изображений в минуту, при этом необработанные данные никогда не покидают ваше объектное хранилище. .
  2. Очистка данных/аннотации. Представьте себе несколько раундов обмена данными между учеными и вашими различными аннотаторами для меток и контроля качества.
    Метод SceneBox: Полнофункциональная интеграция со многими ведущими поставщиками аннотаций, такими как Scale, CVAT и Segments, которые заботятся об этих рабочие процессы одним нажатием кнопки.
  3. Курирование наборов данных.Обычно это очень ручной процесс фильтрации данных по необходимым атрибутам, который часто выполняется путем непосредственного визуального изучения необработанных данных. Это может привести к созданию нескольких копий одних и тех же данных, когда каждый инженер создаст собственные наборы для своих алгоритмов.
    Метод SceneBox:наши наборы являются логической ассоциацией данных в нашем индексе данных. Мы можем выбрать эти данные, используя любую комбинацию запросов атрибутов метаданных, доступных для данных.
  4. Извлечение и отправка модели машинного обучения. Как специалистам по машинному обучению, нам необходимо наполнять наши алгоритмы данными, которые мы только что отобрали. Мы видели, как наши клиенты загружали свои данные из облачного хранилища, а затем писали версию итератора данных для получения этих данных. Итераторы данных поддерживают пакетную обработку, а также применяют преобразования ввода и метки к каждому образцу. Такие итераторы, как правило, разрабатываются индивидуально для каждого отдельного набора данных, над которым они работают.
    Метод SceneBox: StreamableSets!

Потоковая передача данных из облачного хранилища

Получение данных из одного и того же удаленного хранилища для загрузки конвейеров машинного обучения имеет много преимуществ. Например, он предотвращает появление избыточных копий данных, которые многие инженеры неизбежно будут создавать при проведении своих экспериментов. Единый хорошо продуманный код оркестратора данных может использоваться всей командой, чтобы значительно улучшить использование времени. Кроме того, теперь мы можем упаковать больше этих потоковых сервисов в меньшее количество вычислительных ресурсов, что приводит к повышению экономической эффективности.

Все это говорит о том, что облачная потоковая передача данных сопряжена с определенными трудностями. Первая (и, возможно, самая большая) проблема связана с накладными расходами сети и их влиянием на пропускную способность на стороне обучения модели. Существуют также проблемы с перетасовкой данных и вторичными преобразованиями, которые вездесущи в конвейерах машинного обучения.

С StreamableSets мы рискнем решить эти проблемы с помощью независимого от облачных вычислений API. Любой Set SceneBox можно преобразовать в StreamableSet по запросу. Это обеспечивает отношение один к одному между данными в наборе и его потоками. Когда поступает запрос на StreamableSet, мы начинаем сжимать все данные в сжатом формате. Параллельно в фоновом режиме создается один или несколько шардов таких архивов. Затем мы используем WebDatasets для создания объекта PyTorch IterableDataset, который предоставляет нам потоковый доступ к данным ко многим сегментам. Чтобы создать загрузчик данных, мы, наконец, оборачиваем IterableDataset нашей логикой декодирования потока и предоставляем API, который напрямую соответствует интерфейсу torch.utils.data.Dataloader.

Мы встроили в него следующие функции:

  1. Последовательный ввод-вывод: мы сжимаем и создаем последовательные фрагменты данных для более эффективного сетевого доступа к образцам по сравнению с обычными операциями файлового ввода-вывода. Возможность перетасовки данных сохраняется за счет перетасовки полученных данных в буфере определенного размера и, при необходимости, перетасовки между самими осколками.
  2. Конвейеры с высокой пропускной способностью: StreamableSets перебирает сжатые архивы данных, обеспечивая очень эффективный доступ, который масштабируется до чрезвычайно больших наборов данных. Он также предоставляет объект загрузчика данных PyTorch (будет расширен до TensorFlow в более поздних версиях), который может разделять задания по выборке данных между несколькими параллельными рабочими процессами, борясь с любыми издержками сети.
  3. Распределенные сегменты: данные разделены на несколько сегментов, которые сами по себе могут быть распределены между несколькими узлами в облаке, что открывает путь для потоковой передачи с нескольких узлов. Разделение на сегменты также означает, что можно создавать, поддерживать и распространять очень большие наборы данных для обучения или проверки.
  4. Входные и целевые преобразования: входные и меточные преобразования являются неотъемлемой частью задания по обучению/проверке машинного обучения. Любое преобразование PyTorch можно применить к данным из StreamableSet так же, как пользователь сделал бы это со своим объектом DataIterator в PyTorch — передав эти преобразования в качестве аргументов конструктору загрузчика данных.
  5. Повсеместный операционный доступ. Интуитивно понятное веб-приложение SceneBox обеспечивает фильтрацию данных, создание информационных панелей, обогащение и визуализацию. В сочетании с инструментами курирования SceneBox доступ даже к нишевым подмножествам данных, таким как «Все медведи на заднем дворе, которые сидят», теперь возможен непосредственно из вашего скрипта PyTorch.

Использование

В приведенном ниже примере мы увидим, как получить данные CelebA, размещенные на SceneBox, в цикле обучения PyTorch. В частности, мы покажем, как использовать загрузчик данных для набора StreamingSet, содержащего 1000 изображений. Несколько строк кода добавлены вместо torch.utils.data.Dataloader и готово! Сам SceneBox можно установить с помощью простой команды pip:

pip install scenebox
"""
Streamable Sets
"""

from scenebox.clients import SceneEngineClient
import torch 
import time

# Instantiate the client
sec = SceneEngineClient(auth_token="d1390598-a1e4-82d9-6922-1780_53ce_49")

# Set id on SceneBox
celeba_image_set_id = "streamingset"

# Create a streamable set 
streamable_set = sec.get_streamable_set(set_id=celeba_image_set_id)

# Get the pytorch dataloader for this set
dataloader = streamable_set.pytorch_dataloader(num_workers=4, 
                                               batch_size=32, 
                                               shuffle=True)

# Training loop
start_time = time.time()
for i, data in enumerate(dataloader):
    images_batch, metadata_batch, annotations_batch = data

    # Your pytorch code here

end_time = time.time()
time_taken = end_time - start_time
num_batches = i + 1
print("Time per batch: {} ms".format((time_taken * 1000) / num_batches))

Давайте сравним потоковые данные с помощью SceneBox с другими типичными шаблонами произвольного доступа к файлам, такими как:

  1. Последовательный GET: чтение изображений по отдельности с URL-адресов AWS S3 в последовательности.
"""
Sequential GET
"""

import time
import cv2
import requests
import urllib.request
import numpy as np

from scenebox.clients import SceneEngineClient
from scenebox.constants import AssetsConstants

# Instantiate the client
sec = SceneEngineClient(auth_token="d1390598-a1e4-82d9-6922-1780_53ce_49")

# Set id on SceneBox
celeba_image_set_id = "streamingset"

# Collect asset ids in the set
set_asset_ids = sec.assets_in_set(set_id=celeba_image_set_id)

# Split into batches
batch_size = 32
batched_ids = [set_asset_ids[i:i + batch_size] 
               for i in range(0, len(set_asset_ids), batch_size)]

# Image asset manager on SceneBox
images_amc = sec.get_asset_manager(asset_type=AssetsConstants.IMAGES_ASSET_ID)

batch_times = []
for batch_idx, batch in enumerate(batched_ids):
    # Get signed s3 urls for the images in this batch
    image_id__urls_map = images_amc.get_url_in_batch(ids=batch)
    
    images_batch = {}
    # Fetch each of them in sequence
    start_time = time.time()
    for image_id, image_url in image_id__urls_map.items():
        resp = urllib.request.urlopen(image_url)
        image = np.asarray(bytearray(resp.read()), dtype="uint8")
        image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED)
        images_batch[image_id] = image

    end_time = time.time()
    time_taken = end_time - start_time

    print("Batch {} Time {}".format(batch_idx, time_taken))
    batch_times.append(time_taken)

single_fetch_time = sum(batch_times)/len(batch_times)

print("Average single fetch batch time {} ms".format(single_fetch_time*1000))

2. Последовательный многопоточный GET: чтение пакета изображений с использованием одновременных HTTP-запросов.

"""
Threaded GET
"""

import time
import cv2
import requests
import urllib.request
import numpy as np

from scenebox.clients import SceneEngineClient
from concurrent.futures.thread import ThreadPoolExecutor

# Instantiate the client
sec = SceneEngineClient(auth_token="d1390598-a1e4-82d9-6922-1780_53ce_49")

# Set id on SceneBox
celeba_image_set_id = "streamingset"

# Collect asset ids in the set
set_asset_ids = sec.assets_in_set(set_id=celeba_image_set_id)

# Split into batches
batch_size = 32
batched_ids = [set_asset_ids[i:i + batch_size] 
               for i in range(0, len(set_asset_ids), batch_size)]

# Image asset manager on SceneBox
images_amc = sec.get_asset_manager(asset_type=AssetsConstants.IMAGES_ASSET_ID)

batch_times = []
for batch_idx, batch in enumerate(batched_ids):
    # Get signed s3 urls for the images in this batch
    image_id__urls_map = images_amc.get_url_in_batch(ids=batch)
    images_batch = {}

    def fetch_image(image_id, image_url):
      resp = urllib.request.urlopen(image_url)
      image = np.asarray(bytearray(resp.read()), dtype="uint8")
      image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED)
      images_batch[image_id] = image
    
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        for image_id, image_url in image_id__urls_map.items():
            executor.submit(fetch_image, 
                            image_id=image_id, 
                            image_url=image_url)
    end_time = time.time()
    time_taken = end_time - start_time

    print("Batch {} Time {}".format(batch_idx, time_taken))
    batch_times.append(time_taken)

threaded_time = sum(batch_times)/len(batch_times)
print("Average threaded fetch batch time {} ms".format(threaded_time*1000))

В таблице ниже StreamableSets сравнивается с двумя указанными выше методами получения данных из облака:

При среднем времени выборки пакета из 32 изображений 61 миллисекунда StreamableSets в 16 раз быстрее, чем многопоточность, и в 67 раз быстрее, чем последовательные HTTP-запросы. Результаты собираются с машины с 12-ядерным процессором AMD Ryzen 9 3900X.

Заключение

StreamableSets — это последнее дополнение от нас в SceneBox к вашему репертуару инструментов-ускорителей машинного обучения. Теперь пользователи могут использовать высокопроизводительную потоковую передачу данных для подачи своих конвейеров машинного обучения непосредственно из SceneBox практически без изменений в существующем коде. Мы также работаем над многими новыми функциями продукта, которые направлены на достижение целисозданияи включенияобучаемого ПО. сильный>

Я хотел бы услышать ваши мысли об этой статье. Наша команда в SceneBox всегда открыта для общения. Пожалуйста, свяжитесь со мной в LinkedIn, если вы хотите углубиться со мной.