Python3.7 asyncio запускает веб-сервер (FastAPI) и потребитель aio_pika

В моем проекте я пытаюсь запустить REST API (построенный с помощью FastAPI и работающий с Hypercorn), дополнительно я хочу, чтобы при запуске также запускался потребитель RabbitMQ (с aio_pika):

Aio Pika предлагает надежное соединение, которое автоматически восстанавливается в случае сбоя. Если я запускаю приведенный ниже код с hypercorn app:app, потребитель и остальной интерфейс запускаются правильно, но переподключение от aio_pika больше не работает. Как я могу заархивировать рабочий стабильный RabbitMQ Consumer и RestAPI в двух разных процессах (или потоках?). Моя версия Python — 3.7. Обратите внимание, что на самом деле я разработчик Java и Go, если мой подход не соответствует пути Python :-)

@app.on_event("startup")
def startup():
   loop = asyncio.new_event_loop()
    asyncio.ensure_future(main(loop))


@app.get("/")
def read_root():
   return {"Hello": "World"}


async def main(loop):
connection = await aio_pika.connect_robust(
    "amqp://guest:[email protected]/", loop=loop
)

async with connection:
    queue_name = "test_queue"

    # Creating channel
    channel = await connection.channel()  # type: aio_pika.Channel

    # Declaring queue
    queue = await channel.declare_queue(
        queue_name,
        auto_delete=True
    )  # type: aio_pika.Queue

    async with queue.iterator() as queue_iter:
        # Cancel consuming after __aexit__
        async for message in queue_iter:
            async with message.process():
                print(message.body)

                if queue.name in message.body.decode():
                    break

person ghovat    schedule 05.12.2019    source источник
comment
Я не уверен, почему вы создаете новый цикл событий в функции запуска, что, я думаю, может быть связано. Скажите, зачем это нужно?   -  person pgjones    schedule 06.12.2019
comment
Это не требуется, я думал, что это способ сделать это. Похоже на: идти   -  person ghovat    schedule 06.12.2019
comment
А, а без этого работает? Я полагаю, что разные циклы событий могут вызвать проблему.   -  person pgjones    schedule 07.12.2019
comment
вы имеете в виду с get_current_event_loop() вместо нового? Да, я пробовал, но у него та же проблема.   -  person ghovat    schedule 07.12.2019
comment
О, я надеялся, что это будет так. Есть ли что-нибудь зарегистрированное, что могло бы дать подсказку здесь? (Иначе я не вижу проблемы).   -  person pgjones    schedule 09.12.2019
comment
Вовсе нет, я попытался изменить с помощью current_event_loop, чтобы передать его в качестве аргумента для потребителя, и теперь это работает @app.on_event("startup") def startup(): print("jdklasjdlas") loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) Спасибо за вашу помощь.   -  person ghovat    schedule 09.12.2019
comment
Кстати, спасибо за вашу работу над Hypercorn! Хорошая работа!   -  person ghovat    schedule 09.12.2019


Ответы (1)


С помощью @pgjones мне удалось изменить начало потребления на:

@app.on_event("startup")
def startup():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))

И запустите job с asyncio.ensure_future и передайте текущий цикл обработки событий в качестве аргумента, что решило проблему.

Было бы интересно, если бы у кого-то был другой/лучший подход. Спасибо!

person ghovat    schedule 09.12.2019