async def
конечная точка
Вы можете использовать loop.run_in_executor с ProcessPoolExecutor, чтобы запустить функцию в отдельном процессе.
@app.post("/async-endpoint")
async def test_endpoint():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_func) # wait result
def
конечная точка
Поскольку def
конечные точки выполняются неявно в отдельном потоке, вы можете использовать всю мощь модулей multiprocessing и concurrent.futures. Обратите внимание, что внутри функции def
нельзя использовать await
. Образцы:
@app.post("/def-endpoint")
def test_endpoint():
...
with multiprocessing.Pool(3) as p:
result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
...
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(f, [1, 2, 3])
Примечание. Следует помнить, что создание пула процессов в конечной точке, а также создание большого количества потоков может привести к замедлению реакции по мере увеличения количества запросов.
Выполнение на лету
Самый простой и самый естественный способ выполнить функцию в отдельном процессе и немедленно дождаться результатов - использовать loop.run_in_executor с ProcessPoolExecutor.
Пул, как в приведенном ниже примере, можно создать при запуске приложения и не забудьте выключить его при выходе из приложения. Количество процессов, используемых в пуле, можно установить с помощью max_workers ProcessPoolExecutor
параметр конструктора. Если max_workers
равно None
или не задано, по умолчанию будет использоваться количество процессоров на машине.
Недостатком этого подхода является то, что обработчик запроса (операция пути) ожидает завершения вычисления в отдельном процессе, в то время как клиентское соединение остается открытым. А если по какой-то причине связь пропадет, то результату будет некуда возвращаться.
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI
from calc import cpu_bound_func
app = FastAPI()
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
@app.get("/{param}")
async def handler(param: int):
res = await run_in_process(cpu_bound_func, param)
return {"result": res}
@app.on_event("startup")
async def on_startup():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
Перейти на задний план
Обычно задачи, связанные с ЦП, выполняются в фоновом режиме. FastAPI предлагает возможность запускать фоновые задачи для запуска после strong > возврат ответа, внутри которого вы можете запустить и асинхронно дождаться результата вашей задачи, связанной с процессором.
В этом случае, например, вы можете немедленно вернуть ответ "Accepted"
(код HTTP 202) и уникальную задачу ID
, продолжить вычисления в фоновом режиме, а затем клиент может запросить статус задачи, используя этот ID
.
BackgroundTasks
предоставляют некоторые функции, в частности, вы можете запускать некоторые из них (в том числе в зависимостях). И в них вы можете использовать ресурсы, полученные в зависимостях, которые будут очищены только тогда, когда все задачи будут выполнены, а в случае возникновения исключений можно будет их корректно обработать. Это можно более четко увидеть в этом диаграмма.
Ниже приведен пример, в котором выполняется минимальное отслеживание задач. Предполагается, что запущен один экземпляр приложения.
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
from calc import cpu_bound_func
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {}
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
async def start_cpu_bound_task(uid: UUID, param: int) -> None:
jobs[uid].result = await run_in_process(cpu_bound_func, param)
jobs[uid].status = "complete"
@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
return new_task
@app.get("/status/{uid}")
async def status_handler(uid: UUID):
return jobs[uid]
@app.on_event("startup")
async def startup_event():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
Более мощные решения
Все приведенные выше примеры были довольно простыми, но если вам нужна более мощная система для тяжелых распределенных вычислений, вы можете не обращать внимания на брокеров сообщений RabbitMQ
, Kafka
, NATS
и т. Д. И библиотеки, использующие их, такие как Celery.
person
alex_noname
schedule
30.07.2020