Хорошо, может быть, не «в режиме реального времени», а мини-обнаружение дрейфа данных на основе пакетных данных в режиме онлайн.

В последней статье мы указали, как создать компонент дрейфа данных, который затем может стать частью пайплайна Kubeflow. Но что, если ваша модель развернута как API, и вы хотите выполнить обнаружение дрейфа данных вместе с выводом? Это возможно и при использовании Очевидно!

По сути, для этого варианта использования нам все равно понадобится эталонный набор данных (данные, на которых обучалась модель). Но для набора данных для вывода мы будем хранить образцы данных, которые мы видим во время вывода.

Например, предположим, что мы хотим, чтобы выборка размером 100 сравнивалась с эталонным набором данных. Когда приложение запустится, мы сохраним 100 образцов, которые могут соответствовать 100 вызовам API к конечной точке прогнозирования. Мы сравним его с эталонным набором данных и проверим дрейф данных только после того, как у нас будет 100 выборок. Оттуда мы возьмем самые последние 100 образцов, отбрасывая самый последний образец из нашего набора для каждого последующего вызова API.

Дрейф данных вычисляется с использованием этой последней выборки для каждого вызова API. Метрики дрейфа данных отображаются в нужной конечной точке, которую затем Prometheus может очистить и отобразить через Grafana.

Пример должен облегчить понимание. Аналогичный код можно найти в этом репозитории.

У нас есть простое приложение Fast API, которое выполняет прогнозирование. Конечная точка для такого приложения может выглядеть так, как показано ниже 👇🏽

@app.post("/api/v1/predict")
async def predict_v1(data: PersonData):
    features = pd.DataFrame(data.dict(), index=[0])
    response = model.predict(features)
    return response

PersonData содержит функции, которые затем передаются модели для прогнозирования, и возвращается ответ модели. Чтобы определить, отклонились ли эти данные от данных, которые мы видели при обучении, мы должны настроить мониторинг дрейфа.

Для этого сначала напишем простой конфигурационный файл, в основном для инициализации службы мониторинга дрейфа:

column_mapping: 
  categorical_features:
  - job_title
  - industry
  - occupation
  numerical_features:
  - age
data_format:
  header: true
  separator: ','
service:
  calculation_period_sec: 10
  monitors:
  - data_drift
  moving_reference: false
  reference_path: reference_df.csv
  use_reference: true
  window_size: 30

В этом файле конфигурации есть сопоставления столбцов, которые говорят нам, какие функции являются числовыми, а какие — категориальными. Мы устанавливаем путь к reference_df, это путь к набору обучающих данных. Мы также устанавливаем минимальный размер окна и calculation_period_sec.

Минимальный размер окна дает нам размер набора данных логического вывода, который будет использоваться для вычисления обнаружения дрейфа по сравнению с эталонными данными, тогда как calculation_period_sec — это повторяющийся временной интервал для вычисления дрейфа данных (т. е. вычисление обнаружения дрейфа каждые 10 секунд).

После того, как мы настроили этот файл конфигурации, нам нужно инициализировать мониторинг смещения данных в приложении Fast API, добавив следующий фрагмент кода:

from realtime_data_drift.realtime_data_drift.data_drift import (getDriftMonitoringService,MonitoringService)
from starlette_exporter import PrometheusMiddleware, handle_metrics
app.add_middleware(
PrometheusMiddleware,
app_name="sample_fast_api",
prefix="sample_fast_api",
)
app.add_route("/metrics", handle_metrics)
SERVICE: Optional[MonitoringService] = None
@app.on_event("startup")
async def startup_event():
    global SERVICE
    config_file_name = "data-drift-config.yaml"
    if not os.path.exists(config_file_name):
        exit("Cannot find config file for the metrics service. Try to check README.md for setup instructions.")
    with open(config_file_name, "rb") as config_file:
        config = yaml.safe_load(config_file)
    SERVICE = getDriftMonitoringService(config)

Мы собираемся настроить промежуточное ПО Prometheus, чтобы мы могли предоставлять наши метрики в конечной точке /metrics. Этого можно добиться, используя starlette_exporter. Получив это, мы инициализируем службу мониторинга при запуске приложения. Хороший способ повторно использовать мониторинг дрейфа данных во всех развертываниях API — упаковать его и сохранить в частном PyPI. Затем этот пакет можно импортировать во все модели, развернутые через Fast API.

Последний шаг — использовать эту службу в конечной точке прогнозирования. Таким образом, конечная точка, которую мы определили ранее, будет выглядеть примерно так:

@app.post("/api/v1/predict")
async def predict_v1(data: PersonData, background_tasks: BackgroundTasks):
    features = pd.DataFrame(data.dict(), index=[0])
    if SERVICE is None:
        logging.error("data drift service not found!")
    else:
    # drift will be computed when there is 30 rows of data
        background_tasks.add_task(SERVICE.iterate,   features.drop("person_id", axis=1))
   response = model.predict(features)
   return response

Мы используем фоновую задачу Fast API для вычисления обнаружения дрейфа данных, чтобы это не влияло на время отклика прогноза модели.

Как только мы запустим это приложение, мы сможем увидеть показатели дрейфа данных, как только у нас будет достаточно данных (размер окна 30), т.е. 30 запросов к конечной точке /api/v1/predict. Метрики будут доступны в конечной точке /metrics.

Затем эта конечная точка метрик должна быть очищена заданием очистки Prometheus. После того, как метрики были получены, мы можем приступить к визуализации этого в Grafana.

Панель инструментов Grafana доступна в репозитории и может быть импортирована.

Дайте мне знать, что вы думаете о пакете обнаружения дрейфа Evidently!

Если вы хотите поэкспериментировать с новыми способами обнаружения смещения данных, вы можете заглянуть на нашу доску вакансий!