Оркестрация потоков данных является важным аспектом современных организаций, управляемых данными. Это помогает обеспечить плавный и эффективный поток данных в организации, от сбора до анализа и принятия решений.
Одним из мощных инструментов для оркестровки потоков данных является Prefect, система управления рабочими процессами на основе Python. С Prefect вы можете легко создавать, развертывать и отслеживать сложные рабочие процессы данных, используя простой и интуитивно понятный API Python.
Начало работы с префектом
Чтобы начать работу с Prefect, вам сначала необходимо установить его. Вы можете сделать это с помощью pip:
pip install prefect
После того, как вы установили Prefect, вы можете начать создавать свои рабочие процессы с данными.
Создание простого рабочего процесса с данными с помощью Prefect
В приведенном ниже коде показан пример создания простого рабочего процесса данных с помощью Prefect.
В этом примере мы используем библиотеку requests
для получения шуток из API шуток, фильтруем шутки, чтобы включить только те, которые меньше определенной длины, и выводим полученные шутки на консоль.
# my_flow.py import requests from prefect import task, flow @task def get_jokes(term): url = f'<https://icanhazdadjoke.com/search?term={term}>' headers = {'Accept': 'application/json'} response = requests.get(url, headers=headers) jokes = response.json()['results'] return jokes @task def filter_jokes(jokes, max_length=100): filtered_jokes = [joke for joke in jokes if len(joke['joke'])<=max_length] return filtered_jokes @task def print_jokes(jokes): print(jokes) @flow def **dad_joke_flow**(term): jokes = get_jokes(term) filtered_jokes = filter_jokes(jokes) print_jokes(filtered_jokes) dad_joke_flow("funny")
Вот три задачи:
get_jokes
filter_jokes
print_jokes
И один поток:
dad_joke_flow
Чтобы запустить рабочий процесс, показанный в примере, вы можете использовать команду python my_flow.py
. Она передаст аргумент «смешно» параметру термина и выполнит рабочий процесс.
Вот скриншот журнала выполнения.
Запуск сервера Prefect
Чтобы в полной мере воспользоваться механизмом оркестровки Prefect и сервером API, вам необходимо запустить сервер Prefect.
Вы можете сделать это, выполнив следующую команду:
$ prefect server start Starting... ___ ___ ___ ___ ___ ___ _____ | _ \\ _ \\ __| __| __/ __|_ _| | _/ / _|| _|| _| (__ | | |_| |_|_\\___|_| |___\\___| |_| Configure Prefect to communicate with the server with: prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api Check out the dashboard at <http://127.0.0.1:4200>
При работающем сервере Prefect легко создавать, развертывать и отслеживать сложные рабочие процессы данных.
Теперь давайте посмотрим.
Расписание вашего рабочего процесса
Благодаря возможностям планирования Prefect вы можете легко автоматизировать выполнение своих рабочих процессов с данными, гарантируя, что они будут запускаться в нужное время.
Во-первых, вы должны создать файл развертывания YAML, используя заданные параметры, следующим образом:
prefect deployment build my_flow.py:dad_joke_flow --name etl --cron "*/5 * * * *" --params='{"term":"funny"}'
Он создаст файл с именем dad_joke_flow-deployment.yaml
в вашей локальной папке.
Затем вы можете развернуть файл YAML на сервере.
prefect deployment apply dad_joke_flow-deployment.yaml
Развертывание скоро появится в веб-интерфейсе.
Выполнить развертывание
Чтобы выполнить запуск потока из этого развертывания, запустите агент, который извлекает работу из рабочей очереди «по умолчанию».
prefect agent start -q 'default'
Вы можете просмотреть отчет о выполнении в веб-интерфейсе.
Заключение
Prefect — это мощный инструмент для оркестровки потоков данных, предлагающий простой и интуитивно понятный Python API, а также мощные инструменты мониторинга и отладки.
Независимо от того, создаете ли вы простые рабочие процессы данных или сложные конвейеры данных, Prefect — отличный выбор для управления вашими рабочими процессами данных.