Вы хотите выполнять предварительную обработку данных распределенно с сохранением разных версий данных и распределенным обучением одновременно? Если ваш ответ да, вы находитесь в правильном месте. Мы собираемся объединить два мощных инструмента, таких как Ray Train и Pachyderm. Ray Train обеспечивает распределенное обучение глубокому обучению. Pachyderm обеспечивает автомасштабирование, параллельную обработку, контроль версий для данных и конвейеры, управляемые данными. Наша цель — использовать лучшую часть каждого инструмента.
Мы будем использовать Pachyderm для обработки данных и Ray для модели обучения. После интеграции обоих инструментов мы можем работать с данными, используя только одну команду, и ее можно распределить по набору данных, а также распараллелить процесс обучения с помощью настройки обучения PyTorch.
Мы собираемся сосредоточиться на работе с Pachyderm и Ray, поэтому я предполагаю, что вы установили Pachyderm и Ray на Kubernetes, если нет, вы можете настроить их с помощью руководства.
Создайте репозиторий толстокожих и заполните данные
Используя приведенный ниже код, мы создаем проект pach_ray и настраиваем его по умолчанию.
$ pachctl create project pach_ray $ pachctl config update context --project pach_ray editing the currently active context "local" $ pachctl config get context local { "pachd_address": "grpc://10.152.183.249:30653", "cluster_deployment_id": "****", "project": "pach_ray" }
Следующий шаг — создать репозиторий dirty_data и заполнить его данными. В качестве данных мы создаем простой файл JSON.
$ pachctl create repo dirty_data $ pachctl list repo PROJECT NAME CREATED SIZE (MASTER) DESCRIPTION pach_ray dirty_data 5 seconds ago ≤ 0B
Используя приведенный ниже код, мы создаем список и сохраняем его в формате JSON.
import json input_list = list(range(150)) dict_json_data = {'input_data': input_list} with open('input_data.json', 'w') as f: json_object = json.dumps(dict_json_data) f.write(json_object)
И добавьте созданный файл input_data.json в dirty_data.
$ pachctl put file dirty_data@master:input_data.json -f input_data.json
Обработка входных данных — это input_data.json,мыиспользуемконвейерPachyderm и сохраняем его в выходном репозитории Pachyderm. Выходное репо pachyderm создается автоматически при создании пайпа.
Создание пайплайна толстокожих
Чтобы настроить трубу Pachyderm, мы должны создать файл предварительной обработки Python, поместить его в образ докера и создать файл конфигурации для создания трубы.
Pachyderm монтирует каталог /pfs/ в образы Docker. Где /pfs/dirty_data/ — входной каталог, где dirty_directory — входной репозиторий, а /pfs/out/ — выходной каталог, в котором должны быть сохранены файлы и он определяется как выходной каталог.
Давайте настроим файл Dockerfile для создания образа Docker.
И вставьте его в докер-хаб
$ docker build -t pach_ray_integration . $ docker tag pach_ray_integration eduard1/pach_ray_integration:1.2 # login into docker hub $ docker login # push iamge into docker hub $ docker push eduard1/pach_ray_integration:1.2
Последним шагом является создание файла конфигурации и создание канала в толстокожее животное.
трубопровод: определяет имя трубопровода, Pachyderm автоматически создает выходной канал с тем же именем. input: определяет репозиторий ввода и способ чтения данных. transform: определяет изображение, которое будет использоваться, а cmd — способ запуска файла предварительной обработки python в изображениях.
# create pipeline using configuration file $ pachctl create pipeline -f pipeline_configuration.yaml # Run that command we should see the created repo and new file # It means that the pipeline ran successfully $ pachctl list file data_preprocessing@master NAME TYPE SIZE /output_data.json file 1.399KiB
Отлично, мы создали конвейер предварительной обработки и теперь у нас есть готовые данные для создания обучения PyTorch на Ray с интеграцией в Pachyderm.
Подготовьте код для Ray Train с помощью PyTorch
Теперь нам нужно создать класс набора данных для PyTorch. Для этого набора данных нам нужно создать генератор данных факела. Генератор преобразует наш файл output_data.json в тип, подходящий для модели факела поезда.
from torch.utils.data import Dataset import numpy as np class DatasetGenerator(Dataset): def __init__(self, data:dict): self.x = data["input_data"] self.y = data["output_data"] self.x = np.array(self.x, dtype=np.float32).reshape(-1, 1) self.y = np.array(self.y, dtype=np.float32).reshape(-1, 1) def __len__(self): return len(self.x) def __getitem__(self, index): return self.x[index], self.y[index]
Определим структуру модели, которую мы собираемся обучать.
from torch import nn class MyModel(nn.Module): def __init__(self): super().__init__() self.linear_model = nn.Sequential( nn.Linear(1, 2), nn.SELU(), nn.Linear(2,1) ) def forward(self, inputs): return self.linear_model(inputs)
Вы видите, что мы не объявили функцию get_data
. Теперь нам нужно определить наиболее ожидаемую функцию, которая объединяет Ray Train и Pachyderm. Для интеграции мы используем пакет python_pachyderm.
python_pachyderm обеспечивает соединение с Pachyderm, и мы должны преобразовать предварительно обработанный файл в соответствующий формат, такой как словарь, Pandas DataFrame и т. д.
import python_pachyderm import json def get_data(): # connect to pachyderm client_pachyderm = python_pachyderm.Client() # connect to pach_ray project and appropriate file dict_input = {"project": "pach_ray", "repo": "data_preprocessing", "branch": "master"} data_binary = client_pachyderm.get_file(dict_input, "output_data.json") str_json = data_binary.read() str_json = str_json.decode("utf-8") dict_data = json.loads(str_json) return DatasetGenerator(dict_data)
Последним шагом является создание функции обучения, которая будет использоваться в Ray Train, и запуск обучения.
Вы можете видеть в коде штрихи. Строки определяют дополнительные строки кода, которые мы должны добавить к обычной обучающей функции Torch для обучения в Ray. Первый блок преобразует загрузчик данных PyTorch и диалог модели в соответствующий объект Ray. Во-вторых, сохраняет контрольные точки.
Последним шагом процесса обучения является определение и запуск Ray Trainer.
from ray.train.torch import TorchTrainer from ray.air.config import ScalingConfig use_gpu = False trainer = TorchTrainer(train_model_distributed, scaling_config=ScalingConfig(num_workers=5, use_gpu=use_gpu)) results = trainer.fit()
После компиляции в Jupyter Notebook вы должны увидеть статус настройки, информацию о системе, статус пробной версии и ход пробной версии. Если после запуска results.метрики все работает нормально, вы должны увидеть что-то похожее на словарь, представленный ниже.
{'loss': 0.10256955772638321, 'epoch': 39, 'timestamp': 1688980150, 'time_this_iter_s': 0.08252787590026855, 'should_checkpoint': True, 'done': True, 'training_iteration': 40, 'trial_id': '67a58_00000', 'date': '2023-07-10_12-09-10', 'time_total_s': 8.633183479309082, 'pid': 1668740, 'hostname': 'kbp1-lhp-a11064', 'node_ip': '192.168.20.185', 'config': {}, 'time_since_restore': 8.633183479309082, 'iterations_since_restore': 40, 'experiment_tag': '0'}
Заключение
В конце мы создали репозиторий Pachyderm, конвейер предварительной обработки, обученную модель PyTorch с использованием Ray при извлечении предварительно обработанных данных из репозитория с использованием кода Python.
Весь код, представленный на этой странице, вы можете найти в репозитории GitHub.