Вы хотите выполнять предварительную обработку данных распределенно с сохранением разных версий данных и распределенным обучением одновременно? Если ваш ответ да, вы находитесь в правильном месте. Мы собираемся объединить два мощных инструмента, таких как Ray Train и Pachyderm. Ray Train обеспечивает распределенное обучение глубокому обучению. Pachyderm обеспечивает автомасштабирование, параллельную обработку, контроль версий для данных и конвейеры, управляемые данными. Наша цель — использовать лучшую часть каждого инструмента.

Мы будем использовать Pachyderm для обработки данных и Ray для модели обучения. После интеграции обоих инструментов мы можем работать с данными, используя только одну команду, и ее можно распределить по набору данных, а также распараллелить процесс обучения с помощью настройки обучения PyTorch.

Мы собираемся сосредоточиться на работе с Pachyderm и Ray, поэтому я предполагаю, что вы установили Pachyderm и Ray на Kubernetes, если нет, вы можете настроить их с помощью руководства.

Создайте репозиторий толстокожих и заполните данные

Используя приведенный ниже код, мы создаем проект pach_ray и настраиваем его по умолчанию.

$ pachctl create project pach_ray
$ pachctl config update context --project pach_ray
editing the currently active context "local"

$ pachctl config get context local
{
  "pachd_address": "grpc://10.152.183.249:30653",
  "cluster_deployment_id": "****",
  "project": "pach_ray"
}

Следующий шаг — создать репозиторий dirty_data и заполнить его данными. В качестве данных мы создаем простой файл JSON.

$ pachctl create repo dirty_data
$ pachctl list repo
PROJECT  NAME       CREATED       SIZE (MASTER) DESCRIPTION 
pach_ray dirty_data 5 seconds ago ≤ 0B 

Используя приведенный ниже код, мы создаем список и сохраняем его в формате JSON.

import json

input_list = list(range(150))
dict_json_data = {'input_data': input_list}


with open('input_data.json', 'w') as f:
    json_object = json.dumps(dict_json_data)
    f.write(json_object)

И добавьте созданный файл input_data.json в dirty_data.

$ pachctl put file dirty_data@master:input_data.json -f input_data.json

Обработка входных данных — это input_data.json,мыиспользуемконвейерPachyderm и сохраняем его в выходном репозитории Pachyderm. Выходное репо pachyderm создается автоматически при создании пайпа.

Создание пайплайна толстокожих

Чтобы настроить трубу Pachyderm, мы должны создать файл предварительной обработки Python, поместить его в образ докера и создать файл конфигурации для создания трубы.

Pachyderm монтирует каталог /pfs/ в образы Docker. Где /pfs/dirty_data/ — входной каталог, где dirty_directory — входной репозиторий, а /pfs/out/ — выходной каталог, в котором должны быть сохранены файлы и он определяется как выходной каталог.

Давайте настроим файл Dockerfile для создания образа Docker.

И вставьте его в докер-хаб

$ docker build -t pach_ray_integration .
$ docker tag pach_ray_integration eduard1/pach_ray_integration:1.2
# login into docker hub
$ docker login
# push iamge into docker hub
$ docker push eduard1/pach_ray_integration:1.2

Последним шагом является создание файла конфигурации и создание канала в толстокожее животное.

трубопровод: определяет имя трубопровода, Pachyderm автоматически создает выходной канал с тем же именем. input: определяет репозиторий ввода и способ чтения данных. transform: определяет изображение, которое будет использоваться, а cmd — способ запуска файла предварительной обработки python в изображениях.

# create pipeline using configuration file 
$ pachctl create pipeline -f pipeline_configuration.yaml

# Run that command we should see the created repo and new file
# It means that the pipeline ran successfully
$ pachctl list file data_preprocessing@master
NAME              TYPE SIZE     
/output_data.json file 1.399KiB

Отлично, мы создали конвейер предварительной обработки и теперь у нас есть готовые данные для создания обучения PyTorch на Ray с интеграцией в Pachyderm.

Подготовьте код для Ray Train с помощью PyTorch

Теперь нам нужно создать класс набора данных для PyTorch. Для этого набора данных нам нужно создать генератор данных факела. Генератор преобразует наш файл output_data.json в тип, подходящий для модели факела поезда.

from torch.utils.data import Dataset
import numpy as np

class DatasetGenerator(Dataset):
    def __init__(self, data:dict):
        self.x = data["input_data"]
        self.y = data["output_data"]
        
        self.x = np.array(self.x, dtype=np.float32).reshape(-1, 1)
        self.y = np.array(self.y, dtype=np.float32).reshape(-1, 1)
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, index):
        return self.x[index], self.y[index]

Определим структуру модели, которую мы собираемся обучать.

from torch import nn


class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_model = nn.Sequential(
            nn.Linear(1, 2),
            nn.SELU(),
            nn.Linear(2,1)
        )
        
    def forward(self, inputs):
        return self.linear_model(inputs)

Вы видите, что мы не объявили функцию get_data. Теперь нам нужно определить наиболее ожидаемую функцию, которая объединяет Ray Train и Pachyderm. Для интеграции мы используем пакет python_pachyderm.

python_pachyderm обеспечивает соединение с Pachyderm, и мы должны преобразовать предварительно обработанный файл в соответствующий формат, такой как словарь, Pandas DataFrame и т. д.

import python_pachyderm
import json

def get_data():
    # connect to pachyderm
    client_pachyderm = python_pachyderm.Client()
    
    # connect to pach_ray project and appropriate file
    dict_input = {"project": "pach_ray",
                 "repo": "data_preprocessing",
                 "branch": "master"}
    
    data_binary = client_pachyderm.get_file(dict_input, "output_data.json")
    
    str_json = data_binary.read()
    str_json = str_json.decode("utf-8")
    dict_data = json.loads(str_json)

    return DatasetGenerator(dict_data)

Последним шагом является создание функции обучения, которая будет использоваться в Ray Train, и запуск обучения.

Вы можете видеть в коде штрихи. Строки определяют дополнительные строки кода, которые мы должны добавить к обычной обучающей функции Torch для обучения в Ray. Первый блок преобразует загрузчик данных PyTorch и диалог модели в соответствующий объект Ray. Во-вторых, сохраняет контрольные точки.

Последним шагом процесса обучения является определение и запуск Ray Trainer.

from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

use_gpu = False

trainer = TorchTrainer(train_model_distributed,
                      scaling_config=ScalingConfig(num_workers=5, use_gpu=use_gpu))

results = trainer.fit()

После компиляции в Jupyter Notebook вы должны увидеть статус настройки, информацию о системе, статус пробной версии и ход пробной версии. Если после запуска results.метрики все работает нормально, вы должны увидеть что-то похожее на словарь, представленный ниже.

{'loss': 0.10256955772638321,
 'epoch': 39,
 'timestamp': 1688980150,
 'time_this_iter_s': 0.08252787590026855,
 'should_checkpoint': True,
 'done': True,
 'training_iteration': 40,
 'trial_id': '67a58_00000',
 'date': '2023-07-10_12-09-10',
 'time_total_s': 8.633183479309082,
 'pid': 1668740,
 'hostname': 'kbp1-lhp-a11064',
 'node_ip': '192.168.20.185',
 'config': {},
 'time_since_restore': 8.633183479309082,
 'iterations_since_restore': 40,
 'experiment_tag': '0'}

Заключение

В конце мы создали репозиторий Pachyderm, конвейер предварительной обработки, обученную модель PyTorch с использованием Ray при извлечении предварительно обработанных данных из репозитория с использованием кода Python.

Весь код, представленный на этой странице, вы можете найти в репозитории GitHub.