Изначально это было опубликовано в нашем техническом блоге, доступном здесь.

В современной науке о данных и машинном обучении удивительно легко достичь точки, когда наши типичные инструменты Python - такие пакеты, как numpy, pandas или scikit-learn - на самом деле не подходят для масштабирования с нашими данными с точки зрения времени обработки или использования памяти. Это естественный момент для перехода к инструменту распределенных вычислений (классически, что-то вроде Apache Spark), но это может означать переоснащение нашего рабочего процесса для совершенно новой системы, переход между нашей знакомой экосистемой Python и отдельным миром JVM, а также значительное усложнение нашей разработки. рабочий процесс. Библиотека Dask объединяет мощь распределенных вычислений с гибкостью разработки Python для науки о данных с полной интеграцией с общими инструментами обработки данных Python. В этом посте мы создадим простой конвейер данных для аналитики и машинного обучения, работая с текстовыми данными в Dask.

что такое распределенные вычисления?

Рассмотрим следующий сценарий: у вас есть набор данных, возможно, набор текстовых файлов, который слишком велик для размещения в памяти вашего компьютера. Это нормально - мы можем просто использовать потоковую передачу файлов Python и другие инструменты генератора для итерации по нашему набору данных, не загружая все это в память! Но… тогда мы сталкиваемся с ограничениями по скорости, поскольку даже при всей способности памяти, которую мы можем собрать, задание по-прежнему выполняется в одном потоке. Благодаря функции безопасности, называемой Global Interpreter Lock в Python (точнее, CPython, который использует большинство людей), написание параллельного кода на Python может быть немного сложным. Однако есть несколько хороших решений: либо с помощью инструментов нижнего уровня за пределами GIL (например, numpy, выполняя многопоточную тяжелую работу в скомпилированном коде, отличном от Python), либо с использованием нескольких потоков / процессов из кода Python с такими пакетами, как joblib или multiprocessing. Но попытка ускорить ваш код с помощью распараллеливания может быть трудной для правильной работы и может привести (даже если все сделано правильно) к менее читаемому коду, что потребует от вас полной перестройки вашего процесса ... и вы все еще ограничены ресурсами на вашей машине!

Для действительно крупномасштабных проблем, подобных этой, распределенные вычисления могут быть мощным решением. В распределенной системе вместо того, чтобы просто пытаться работать в нескольких процессах или потоках на одной машине, работа передается нескольким независимым рабочим машинам, каждая из которых обрабатывает фрагменты набора данных в своей собственной памяти и / или на диске и на собственных процессорах. Эти рабочие узлы взаимодействуют не совместно с памятью и дисковым пространством (как это делает правильно многопоточный код), а только через относительно простой обмен сообщениями друг с другом или с центральным планировщиком. В обмен на сложность проектирования, заключающуюся в настройке этого централизованного планировщика и сохранении рабочих групп отдельно друг от друга (поскольку массовое перемещение данных между рабочими - потенциально очень дорогостоящая операция), распределенные вычислительные системы могут позволить вам масштабировать код для очень больших наборов данных для выполнения. параллельно на любое количество рабочих.

dask для распределенных вычислений

Часто для перехода к распределенным вычислениям требуется использование такого инструмента, как Apache Spark. Несмотря на то, что Spark мощный, с ним может быть сложно работать, с ним сложно создавать прототипы локально, требуется иметь дело с кодом, работающим в JVM (что может быть вызвано значительным переключением ментальной модели с типичного кода Python, найденного в экосистеме DS / ML. ), и его способность справляться с определенными задачами, такими как первоклассное машинное обучение или большие многомерные векторные операции, ограничена. Dask стремится изменить это как родной инструмент Python, разработанный с нуля для интеграции с типичными инструментами обработки данных Python (а в некоторых случаях, по сути, являясь его заменой).

Под капотом Dask - это распределенный планировщик задач, а не инструмент обработки данных как таковой, то есть все, о чем заботится планировщик Dask, - это оркестровка Delayed объектов (по сути, асинхронных обещаний, обертывающих произвольный код Python) с их зависимости в граф выполнения. По сравнению с довольно ограниченными высокоуровневыми примитивами Spark (по сути, расширением парадигмы MapReduce) это означает, что Dask может организовывать очень сложные задачи, обеспечивая замечательные применения, такие как планирование с несколькими GPU в RAPIDS. Однако для наших целей нам не нужно беспокоиться об этих низкоуровневых внутренних компонентах - Dask предоставляет нам несколько коллекций для обертывания низкоуровневых задач в высокоуровневые рабочие процессы:

  • dask.bag: неупорядоченный набор, по сути, распределенная замена итераторов Python, считываемых из текстовых / двоичных файлов или из произвольных Delayed последовательностей
  • dask.array: Распределенные массивы с numpy-подобным интерфейсом, отлично подходят для масштабирования операций с большими матрицами.
  • dask.dataframe: Распределенные pandas-подобные фреймы данных для эффективной обработки табличных, организованных данных.
  • dask_ml: распределенные оболочки для scikit-learn подобных инструментов машинного обучения

Поскольку он является родным для Python, начать с Dask так же просто, как pip или conda установить его в выбранной вами среде (или с помощью одного из их образов докеров), и он будет нормально работать даже на одном ноутбуке (хотя тот же код можно с минимальными изменениями масштабировать до сценария использования кластера с тысячей рабочих!).

Давайте нырнем!

начиная

В этом примере мы будем использовать дамп данных Stack Exchange с открытым исходным кодом archive.org. Мы будем извлекать один из Posts наборов данных, который сам по себе представляет собой довольно простой XML-файл, но тот, который может быть (в случае более крупных сообществ, например, Stack Overflow) слишком большим, чтобы поместиться в памяти или быстро обработать на одной машине. . Мы сможем выполнять всю нашу обработку в Dask, используя один простой прием для настройки - поскольку мы знаем, что файл структурирован с помощью заголовка и нижнего колонтитула XML, мы можем пойти дальше и удалить их из командной строки:

$ sed -i '' '1,2d;$d' Posts.xml

после чего каждая строка текстового файла становится независимым объектом XML row, поэтому мы можем легко разделить его между несколькими рабочими процессами.

Мы можем настроить чтение наших исходных данных:

>>> client = dask.distributed.Client() # uses local scheduler
>>> posts = dask.bag.read_text(“Posts.xml”, blocksize=1e8)

После того, как мы объявили Client, любые операции в том же сеансе по умолчанию будут использовать его для вычислений. Вызывая его без аргументов, Dask автоматически запускает локальный кластер для выполнения работы. Это одна из моих любимых вещей в Dask - по сравнению с лифтом, связанным с настройкой Spark для локальной разработки, я обнаружил, что простота использования инструментов Dask является одной из его самых сильных сторон.

Затем нам просто нужно прочитать текстовый файл (или файлы) в сумку Dask («мешок» здесь в математическом смысле, а именно неупорядоченный набор с дубликатами). На практике нам действительно нужно думать о пакетах как о чем-то вроде распределенной итерации - мы можем отображать функции на каждый элемент, фильтровать их и складывать их вместе в смысле MapReduce. Это отличная отправная точка для множества наборов данных, в которых мы изначально не можем дать много гарантий относительно структуры данных.

>>> posts
<bag-from-delayed, npartitions=1>

Если мы проверим объект posts, мы увидим несколько непрозрачную ссылку на объект dask.Delayed вместо фактического представления наших данных. Это связано с тем, что планировщик Dask лениво строит граф выполнения, когда мы объявляем операции, поэтому объект в его нынешнем виде представляет собой просто представление шагов, которые планирует Dask (в данном случае создание пакета из отложенного чтения файла с один раздел, так как мы работаем с небольшим демонстрационным файлом). График не будет фактически ничего вычислять, пока мы не укажем ему это явно, что позволит планировщику оптимизировать свою работу под капотом. Если мы хотим проверить наши данные, мы можем извлечь образцы:

>>> posts.take(1) 
(‘ <row Id=”5" PostTypeId=”1" … />\r\n’,) # truncating XML here

Dask будет лениво вычислять данные, достаточные для создания запрашиваемого представления, поэтому мы получаем из файла единственный объект строки XML.

Естественно, мы захотим применить функции к элементам нашего bag, чтобы навести порядок в наших данных, и метод map() - хлеб с маслом для этого. Давайте перенесем наши данные из XML в словарь Python, используя xml.etree.ElementTree стандартной библиотеки:

>>> posts = posts.map(lambda row: ElementTree.fromstring(row).attrib) 
>>> posts.take(1) 
({‘Id’: ‘5’, 
  ‘PostTypeId’: ‘1’, 
  ‘CreationDate’: ‘2014–05–13T23:58:30.457’, 
  ‘Score’: ‘9’, 
  ‘ViewCount’: ‘671’, 
  ‘Body’: ‘<p>I\’ve always been interested in machine learning, but I can\’t figure out one thing about starting out with a simple “Hello World” example — how can I avoid hard-coding behavior?</p>\n\n<p>For example, if I wanted to “teach” a bot how to avoid randomly placed obstacles, I couldn\’t just use relative motion, because the obstacles move around, but I don\’t want to hard code, say, distance, because that ruins the whole point of machine learning.</p>\n\n<p>Obviously, randomly generating code would be impractical, so how could I do this?</p>\n’, 
  ‘OwnerUserId’: ‘5’, 
  ‘LastActivityDate’: ‘2014–05–14T00:36:31.077’, 
  ‘Title’: ‘How can I do simple machine learning without hard-coding behavior?’, 
  ‘Tags’: ‘<machine-learning>’, 
  ‘AnswerCount’: ‘1’, 
  ‘CommentCount’: ‘1’, 
  ‘FavoriteCount’: ‘1’, 
  ‘ClosedDate’: ‘2014–05–14T14:40:25.950’},)

Напомним, что Даск здесь просто лениво строит вычислительный граф. Каждый раз, когда мы повторно связываем переменную posts, мы просто перемещаем эту ссылку в начало графика. Мы действительно можем увидеть, как выглядит график, с помощью встроенного визуализатора (используя graphviz под капотом):

>>> posts.visualize(rankdir="LR")

Как и map, отсюда мы также можем фильтровать элементы пакета - давайте оставим только сообщения верхнего уровня, а не ответы, обозначенные PostTypeId из 1:

>>> posts = posts.filter(lambda row: row["PostTypeId"] == "1")

Пакеты Dask также поддерживают агрегирование с помощью своих методов fold и foldby, которые поддерживают парадигму, подобную MapReduce. Однако я склонен вместо этого выполнять такие операции в фреймах данных. К счастью, создавать фреймы данных, если у нас есть некоторое подобие структуры наших данных. Теперь, когда мы знаем, что интересующие нас поля присутствуют, давайте создадим одно из них:

>>> metadata = { 
... “Id”: int, 
... “CreationDate”: “datetime64[ns]”, 
... “Body”: str, 
... “Tags”: str 
... } 
>>> posts = posts.to_dataframe(meta=metadata)

Dask может попытаться определить типы данных, но это может привести к потенциально дорогостоящим вычислениям или ошибкам (особенно в отношении обработки нулей). Я обычно считаю полезным идти вперед и четко указывать типы, особенно потому, что это дает нам немедленную возможность отфильтровать выбранные столбцы и применить начальное преобразование типов данных. Здесь нам просто нужно предоставить типы Python для Dask для создания схемы. (Мы также можем использовать более эзотерические типы - например, datetime64 - это numpy тип данных.)

операции с фреймами данных

После того, как мы разумно структурировали наши данные и преобразовали их в табличный dask.dataframe объект из нашего dask.bag, мы оказываемся на знакомой территории для инструментов анализа данных:

>>> posts.head()

Это выглядит как обычный pandas фрейм данных - фактически, каждый раздел фрейма данных Dask сам по себе является pandas фреймом данных! Обычно поддерживаются наши типичные операции фрейма данных, такие как переименование или доступ к строкам / столбцам по индексу:

>>> snakecase_regex = re.compile(r”(?<!^)(?=[A-Z])”) 
>>> posts.columns = [
...     re.sub(snakecase_regex, “_”, c).lower() 
...     for c in posts.columns
... ]

Мы также можем использовать наши типичные столбцовые операции из pandas. Числовые манипуляции работают должным образом (поскольку лежащие в основе dask.array зеркальные numpy операции), или мы можем использовать подмодули pandas.str и pandas.dt для работы со строками и данными даты. (Кстати, недостаточное использование этих менее известных инструментов - распространенная ошибка, которую я встречал с новыми специалистами по данным: часто они apply используют строковую функцию для столбца, а не используют pandas встроенные функции, которые часто значительно быстрее). Например, мы можем разделить XML-строку наших тегов на массив и выполнить простую очистку текста в теле:

>>> tag_regex = re.compile(r”(?<=<)\S*?(?=>)”) 
>>> posts.tags = posts.tags.str.findall(tag_regex) 
>>> posts.body = posts.body.map(
...     lambda body: BeautifulSoup(body).get_text(), 
...     meta=(“body”, str)
... ) 
>>> posts.head()

Фильтрация данных (как и следовало ожидать) также выглядит как типичная pandas операция. Например, чтобы оставить только сообщения с тегами python,

>>> python_posts = posts[posts.tags.map(
...     lambda tags: "python" in tags
... )]

Обратите внимание, что при фильтрации столбцов, которые, вероятно, будут упорядочены (например, отметка даты), мы потенциально можем удалить большую часть или все некоторые разделы и оставить другие нетронутыми. После подобных операций, возможно, стоит переразбить dask.dataframe, чтобы лучше распределять данные между его работниками. Dask, как правило, делает это разумно (как можно лучше разбивая по индексу), поэтому нам просто нужно иметь представление о том, сколько разделов нам нужно после фильтрации (альтернативно, сколько наших данных мы ожидаем удалить).

аналитика и машинное обучение

Теперь, когда у нас есть данные в чистом виде, мы можем извлечь из них полезные знания. Например, давайте посмотрим на наиболее распространенные теги, которые встречаются вместе с тегом python:

>>> # replicates rows, one per value in `tags`
>>> python_posts = python_posts.explode("tags")
>>> python_posts = python_posts[python_posts.tags != 'python']
>>> tag_counts = python_posts.tags.value_counts()

Проверяя это, мы видим, что все еще находимся в распределенной области: tag_counts привязан к серии Dask (аналогично объекту pandas) со всеми лежащими в ее основе вычислениями. Однако сейчас мы находимся в точке, где агрегированный результат реально умещается в памяти, поэтому мы можем материализовать его, просто запустив вычисление:

>>> tag_counts = tag_counts.compute() # returns a pandas series 
>>> tag_counts.head(5) 
machine-learning    1190 
scikit-learn         640 
pandas               542 
keras                510 
tensorflow           352 
Name: tags, dtype: int64

который будет запускать задачи настолько далеко назад, насколько это необходимо, в своей цепочке зависимостей, чтобы вернуть объект в памяти или на диске, если это необходимо. На самом деле, если с набором данных можно работать на одной машине, это, как правило, будет намного быстрее, чем хранить его в распределенной среде. К счастью, объекты Dask могут легко переноситься в аналогичные локальные объекты Python (например, здесь pandas.Series) при вычислении.

Помимо возможностей аналитики в базовом Dask, мы можем получить доступ ко многим функциям машинного обучения в соответствующем пакете dask-ml. Давайте попробуем отличить сообщения о Python и R из нашего набора данных:

>>> posts.tags = posts.tags.map(
...     lambda tags: list(set(tags).intersection({"python", "r"}))
... )
>>> posts = posts[posts.tags.map(lambda tags: len(tags) == 1)]  
>>> # exactly one tag, not both
>>> posts.tags = posts.tags.map(
...     lambda tags: tags[0]
... ).astype("category")

Поэтому мы сократили наш набор данных до сообщений, помеченных только python или r (но не обоими одновременно), и отбросили другие теги. Пакет dask-ml представляет собой распределенные эквиваленты ряда scikit-learn инструментов конвейера:

>>> from dask_ml.model_selection import train_test_split
>>> from dask_ml.preprocessing import LabelEncoder
>>> from dask_ml.feature_extraction.text import HashingVectorizer
>>> train, test = train_test_split(
...     posts, 
...     test_size=0.25, 
...     random_state=42
... )
>>> label_encoder = LabelEncoder().fit(train["tags"])
>>> vectorizer = HashingVectorizer(
...     stop_words="english"
... ).fit(train["body"])

Они будут распределять свои действия по кластеру Dask (хотя обратите внимание, что train_test_split будет перетасовывать данные между разделами, что может быть потенциально дорогостоящим шагом), при этом шаги предварительной обработки возвращают массивы Dask, подходящие для алгоритмов машинного обучения. LabelEncoder здесь может извлечь выгоду из типа данных category, который мы установили выше для наших меток, что позволяет кластеру избежать потенциально дорогостоящего повторного сканирования всего набора данных для изучения доступных меток. Аналогичным образом HashingVectorizer разработан для эффективной работы с распределенным набором данных. В отличие от чего-то вроде CountVectorizer или TfidfVectorizer в scikit-learn, которые должны были бы сопоставлять информацию между разделами, HashingVectorizer не имеет состояния и может работать параллельно по всему набору данных (поскольку это зависит только от хэша каждого входного токена) для эффективного создания разреженного матричное представление нашего текста.

Когда у нас есть готовые данные, Dask также предоставляет нам несколько вариантов масштабирования алгоритмов машинного обучения. Например, мы можем использовать его для распараллеливания поиска по сетке и оптимизации гиперпараметров для небольших наборов данных, для обработки распараллеливаемых задач в рамках таких алгоритмов, как XGBoost, или для управления пакетным обучением в многораздельном наборе данных. Давайте попробуем последний вариант и обучим простой классификатор на наших данных - поскольку Dask интегрируется с типичным стеком данных Python, мы можем работать из коробки с scikit-learn инструментами, для хорошей игры нужен только алгоритм, поддерживающий partial_fit парадигму пакетного обучения .

>>> from dask_ml.wrappers import Incremental
>>> from sklearn.linear_model import SGDClassifier
>>> model = Incremental(
...     SGDClassifier(penalty="l1"), 
...     scoring="accuracy", 
...     assume_equal_chunks=True
... )

Здесь мы оборачиваем наш простой линейный классификатор (со строгой нормой L1, чтобы он научился сосредотачиваться на наиболее информативных токенах в нашем разреженном представлении) в оболочку Incremental, которая позволит нам обучать наши данные для каждого раздела. Чтобы тренироваться, мы просто бегаем

>>> X = vectorizer.transform(train["body"])
>>> y = label_encoder.transform(train["tags"])
>>> model.fit(X, y, classes=[0, 1])

который очень похож на scikit-learn с некоторыми дополнительными причудами - например, модели необходимо знать все доступные классы на случай, если один из них не представлен в данном пакете, а оболочке Incremental нужна наша гарантия, что куски от X и y будут одинакового размера (что мы можем смело предположить для выхода наших ступеней трансформатора). Аналогично для подсчета очков:

>>> X_test = vectorizer.transform(test[“body”]) 
>>> y_test = label_encoder.transform(test[“tags”]) 
>>> print(f”{model.score(X_test, y_test):.3f}”) 
0.896

Неплохо, учитывая, что мы его вообще не настраивали!

переход в удаленный кластер

Все, что мы сделали до сих пор, отлично работает на нашей локальной машине с использованием LocalCluster варианта кластера Dask. Тем не менее, одним из достоинств Dask является простота перевода этого кода для работы в удаленном кластере.

Еще в начале нашего конвейера мы объявили dask.distributed.Client() без каких-либо аргументов. Это автоматически запускает локальную версию кластера для вычислений. (Мы также могли бы явно создать dask.distributed.LocalCluster, если, скажем, хотим ограничить его ресурсы.) Чтобы использовать удаленный кластер, мы просто создаем клиента, например

>>> client = dask.distributed.Client("tcp://<address:port>")

с адресом планировщика удаленного кластера, тогда остальная часть нашей работы будет в основном «просто работать»! Dask даже предоставляет нам удобную панель мониторинга для кластера, чтобы мы могли видеть ход выполнения задачи, использование ресурсов и т. Д. (Мы действительно можем видеть это и с нашим локальным кластером на localhost:8787.)

Поскольку рабочие Dask больше не могут ссылаться на файлы на нашей локальной машине, нам действительно нужен способ передачи данных в кластер и из него. К счастью, разработчики Dask также поддерживают несколько удобных инструментов для взаимодействия с облачными хранилищами данных, такими как S3 и GCS. Например, для данных в S3 мы можем использовать пакет s3fs, который дает нам объект, подобный файловой системе, для работы с данными в S3. В Dask мы можем просто передать путь S3 к нашему файловому вводу-выводу, как если бы он был локальным, например

>>> posts = dask.bag.read_text("s3://<S3 bucket path to data>")

и s3fs позаботится обо всем, что находится под капотом. Мы также можем получить доступ к этим файлам на нашем локальном компьютере, например

>>> fs = s3fs.S3FileSystem() 
>>> with fs.open(“s3://<S3 bucket path>”, “rb”) as rf: 
...     data = rf.read() # or whatever file operation we want!

Последнее, о чем нам нужно думать при удаленных вычислениях (в общем, не только с Dask), - это управление средой. В нашем локальном кластере воркеры выполняются в одной среде Python, поэтому любые пакеты и функции, доступные в нашей собственной среде, доступны воркерам. В удаленных кластерах рабочие, естественно, запускают свои собственные изолированные среды Python, поэтому нам нужно подумать о том, какой код у нас есть для рабочих.

Вызовы функций, которые мы запускаем в кластер Dask, сериализуются с cloudpickle, но более сложный локальный код или ссылки на другие пакеты могут быть трудными. Как правило, я стараюсь как можно больше использовать встроенные функции dask (особенно в фреймах данных) и ссылаться на пакеты в противном случае - поскольку Dask работает в стандартной среде Python, установка пакетов на рабочие процессы проста, а ресурсы Kubernetes, управляющие нашим кластером Dask, заставляют их подталкивать легко устанавливается на весь кластер.

что насчет Spark?

В качестве системы распределенных вычислений и обработки данных Dask предлагает естественное сравнение со Spark. Со своей стороны, я использовал Spark и Dask и обнаружил, что намного проще начать работать с Dask, имея опыт работы в области науки о данных. Тот факт, что он намеренно отражает общие инструменты анализа данных, такие как numpy и pandas, значительно снижает входной барьер для пользователя Python, как для изучения инструмента, так и для масштабирования существующих проектов. Более того, поскольку Dask - это собственный инструмент Python, установка и отладка намного проще: Dask и связанные с ним инструменты могут быть просто установлены в обычной среде Python с pip или conda, а отладка так же проста, как чтение обычной трассировки стека Python или вывода REPL. . По сравнению с нетривиальными задачами настройки локального Spark для разработки и расшифровки выходных данных JVM, чередующихся в коде Python (поскольку реально специалист по данным или инженер по машинному обучению будет взаимодействовать с кластером через pyspark), это значительно упрощает процесс разработки. Хотя pyspark может интегрировать собственный код Python, его развертывание в кластере нетривиально (в отличие от простой _96 _ / _ 97_ установки дополнительных пакетов в кластере для Dask, что просто даже в развернутых кластерах в такой инфраструктуре, как Kubernetes).

Spark был разработан специально для обычных задач обработки данных, и я обнаружил, что его инструменты машинного обучения несколько не подходят для моей работы по сравнению с Dask. С учетом сказанного, есть несколько соображений, по которым Dask не лучший вариант - например, Dask в настоящее время не имеет хорошего способа работы с потоковыми данными, тогда как Spark может интегрироваться с парадигмой потоковой передачи искр или говорить более легко. к более новым инструментам, таким как Apache Beam. Точно так же, хотя реализация Dask на Python может быть благом для практиков DS / ML, это означает, что в ней отсутствует межъязыковая поддержка Spark. Короче говоря, для групп разработки данных, которым необходимо интегрироваться с другими инструментами Scala / Java в экосистеме Hadoop (особенно с инструментами потоковой передачи), Spark может быть лучшим выбором, но для групп, ориентированных на Python, с интенсивным анализом данных и потребностями в машинном обучении, Dask может быть действительно мощным инструментом.

завершение

Для специалиста по данным или инженера по машинному обучению переход от одномашинного кода в традиционном стеке данных Python (numpy, pandas, scikit-learn и т. Д.) К распределенной вычислительной среде может быть сложной задачей. Набор инструментов Dask делает эту миграцию более чистой, с распределенными аналогами для большей части общего стека данных Python. Это позволяет использовать в рабочем процессе масштабируемые кластеры или просто управлять параллелизмом и кэшированием дисков на одной машине.