Оркестрация потоков данных является важным аспектом современных организаций, управляемых данными. Это помогает обеспечить плавный и эффективный поток данных в организации, от сбора до анализа и принятия решений.

Одним из мощных инструментов для оркестровки потоков данных является Prefect, система управления рабочими процессами на основе Python. С Prefect вы можете легко создавать, развертывать и отслеживать сложные рабочие процессы данных, используя простой и интуитивно понятный API Python.

Начало работы с префектом

Чтобы начать работу с Prefect, вам сначала необходимо установить его. Вы можете сделать это с помощью pip:

pip install prefect

После того, как вы установили Prefect, вы можете начать создавать свои рабочие процессы с данными.

Создание простого рабочего процесса с данными с помощью Prefect

В приведенном ниже коде показан пример создания простого рабочего процесса данных с помощью Prefect.

В этом примере мы используем библиотеку requests для получения шуток из API шуток, фильтруем шутки, чтобы включить только те, которые меньше определенной длины, и выводим полученные шутки на консоль.

# my_flow.py
import requests
from prefect import task, flow

@task
def get_jokes(term):
    url = f'<https://icanhazdadjoke.com/search?term={term}>'
    headers = {'Accept': 'application/json'}
    response = requests.get(url, headers=headers)
    jokes = response.json()['results']
    return jokes

@task
def filter_jokes(jokes, max_length=100):
    filtered_jokes = [joke for joke in jokes if len(joke['joke'])<=max_length]
    return filtered_jokes
@task
def print_jokes(jokes):
    print(jokes)
@flow
def **dad_joke_flow**(term):
    jokes = get_jokes(term)
    filtered_jokes = filter_jokes(jokes)
    print_jokes(filtered_jokes)


dad_joke_flow("funny")

Вот три задачи:

  • get_jokes
  • filter_jokes
  • print_jokes

И один поток:

  • dad_joke_flow

Чтобы запустить рабочий процесс, показанный в примере, вы можете использовать команду python my_flow.py. Она передаст аргумент «смешно» параметру термина и выполнит рабочий процесс.

Вот скриншот журнала выполнения.

Запуск сервера Prefect

Чтобы в полной мере воспользоваться механизмом оркестровки Prefect и сервером API, вам необходимо запустить сервер Prefect.

Вы можете сделать это, выполнив следующую команду:

$ prefect server start
Starting... ___ ___ ___ ___ ___ ___ _____ 
| _ \\ _ \\ __| __| __/ __|_   _|
|  _/   / _|| _|| _| (__  | |
|_| |_|_\\___|_| |___\\___| |_|

Configure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

Check out the dashboard at <http://127.0.0.1:4200>

При работающем сервере Prefect легко создавать, развертывать и отслеживать сложные рабочие процессы данных.

Теперь давайте посмотрим.

Расписание вашего рабочего процесса

Благодаря возможностям планирования Prefect вы можете легко автоматизировать выполнение своих рабочих процессов с данными, гарантируя, что они будут запускаться в нужное время.

Во-первых, вы должны создать файл развертывания YAML, используя заданные параметры, следующим образом:

prefect deployment build my_flow.py:dad_joke_flow --name etl --cron "*/5 * * * *"  --params='{"term":"funny"}'

Он создаст файл с именем dad_joke_flow-deployment.yaml в вашей локальной папке.

Затем вы можете развернуть файл YAML на сервере.

prefect deployment apply dad_joke_flow-deployment.yaml

Развертывание скоро появится в веб-интерфейсе.

Выполнить развертывание

Чтобы выполнить запуск потока из этого развертывания, запустите агент, который извлекает работу из рабочей очереди «по умолчанию».

prefect agent start -q 'default'

Вы можете просмотреть отчет о выполнении в веб-интерфейсе.

Заключение

Prefect — это мощный инструмент для оркестровки потоков данных, предлагающий простой и интуитивно понятный Python API, а также мощные инструменты мониторинга и отладки.

Независимо от того, создаете ли вы простые рабочие процессы данных или сложные конвейеры данных, Prefect — отличный выбор для управления вашими рабочими процессами данных.