Как сделать многопроцессорность в FastAPI

При обслуживании запроса FastAPI у меня есть задача, связанная с процессором, для каждого элемента списка. Я бы хотел выполнить эту обработку на нескольких ядрах ЦП.

Как правильно сделать это в FastAPI? Могу ли я использовать стандартный multiprocessing модуль? Все руководства / вопросы, которые я нашел до сих пор, касаются только задач, связанных с вводом-выводом, таких как веб-запросы.


person CryingSofa    schedule 30.07.2020    source источник


Ответы (1)


async def конечная точка

Вы можете использовать loop.run_in_executor с ProcessPoolExecutor, чтобы запустить функцию в отдельном процессе.

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def конечная точка

Поскольку def конечные точки выполняются неявно в отдельном потоке, вы можете использовать всю мощь модулей multiprocessing и concurrent.futures. Обратите внимание, что внутри функции def нельзя использовать await. Образцы:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

Примечание. Следует помнить, что создание пула процессов в конечной точке, а также создание большого количества потоков может привести к замедлению реакции по мере увеличения количества запросов.


Выполнение на лету

Самый простой и самый естественный способ выполнить функцию в отдельном процессе и немедленно дождаться результатов - использовать loop.run_in_executor с ProcessPoolExecutor.

Пул, как в приведенном ниже примере, можно создать при запуске приложения и не забудьте выключить его при выходе из приложения. Количество процессов, используемых в пуле, можно установить с помощью max_workers ProcessPoolExecutor параметр конструктора. Если max_workers равно None или не задано, по умолчанию будет использоваться количество процессоров на машине.

Недостатком этого подхода является то, что обработчик запроса (операция пути) ожидает завершения вычисления в отдельном процессе, в то время как клиентское соединение остается открытым. А если по какой-то причине связь пропадет, то результату будет некуда возвращаться.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

Перейти на задний план

Обычно задачи, связанные с ЦП, выполняются в фоновом режиме. FastAPI предлагает возможность запускать фоновые задачи для запуска после возврат ответа, внутри которого вы можете запустить и асинхронно дождаться результата вашей задачи, связанной с процессором.

В этом случае, например, вы можете немедленно вернуть ответ "Accepted" (код HTTP 202) и уникальную задачу ID, продолжить вычисления в фоновом режиме, а затем клиент может запросить статус задачи, используя этот ID.

BackgroundTasks предоставляют некоторые функции, в частности, вы можете запускать некоторые из них (в том числе в зависимостях). И в них вы можете использовать ресурсы, полученные в зависимостях, которые будут очищены только тогда, когда все задачи будут выполнены, а в случае возникновения исключений можно будет их корректно обработать. Это можно более четко увидеть в этом диаграмма.

Ниже приведен пример, в котором выполняется минимальное отслеживание задач. Предполагается, что запущен один экземпляр приложения.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

Более мощные решения

Все приведенные выше примеры были довольно простыми, но если вам нужна более мощная система для тяжелых распределенных вычислений, вы можете не обращать внимания на брокеров сообщений RabbitMQ, Kafka, NATS и т. Д. И библиотеки, использующие их, такие как Celery.

person alex_noname    schedule 30.07.2020
comment
Но таким образом у меня нет доступа к результату возврата cpu_bound_func, верно? - person CryingSofa; 04.08.2020
comment
В случае фонового выполнения да, но я изменил ответ для возврата примера. - person alex_noname; 04.08.2020