Запуск двух асинхронных сопрограмм, каждая в своем собственном процессе Python

Если у нас есть 2 сопрограммы asyncio, можно ли использовать Python multiproessing, чтобы позволить каждой из них работать в своем собственном процессе, и разрешить остановку сопрограмм в обоих процессах (путем вызова их метода stop), когда пользователь нажимает Ctrl+C?

Это будет похоже на приведенный ниже код, за исключением того, что сопрограммы foo.start() и bar.start() должны иметь свой собственный процесс.

from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal

class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

if __name__ == '__main__':
    foo = App('foo')
    bar = App('bar')
    
    # Running in a single process works fine
    try:
        asyncio.run(asyncio.wait([foo.start(), bar.start()]))
    except KeyboardInterrupt:
        asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))

Пробовал использовать multiprocessing и signals, но я также не уверен, как вызывать foo.stop() и bar.stop() до завершения двух процессов.

if __name__ == '__main__':
    
    def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        
    def start_foo():
        asyncio.run(foo.start())
        
    def start_bar():
        asyncio.run(bar.start())
        
    foo = App('foo')
    bar = App('bar')    
    pool = multiprocessing.Pool(10, init_worker)
        
    try:
        print('Starting 2 jobs')
        pool.apply_async(start_foo)
        pool.apply_async(start_bar)

        while True:        
            time.sleep(1)  # is sleeping like this a bad thing?
                
    except KeyboardInterrupt:
        print('Caught KeyboardInterrupt, terminating workers')
        pool.terminate()
        pool.join()
    
    print('Shut down complete')

# Based on https://stackoverflow.com/a/11312948/741099

Использование Python 3.9.5 в Ubuntu 20.04


Основываясь на решении @Will Da Silva, я внес небольшие изменения, чтобы проверить, вызывается ли asyncio.run(app.stop()) при нажатии Ctrl+C

class App:
    def __init__(self, text):
        self.text = text
    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        print(f'Stopping {self.text}')
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())
        
if __name__ == '__main__':  
    
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:        
        try:
            print(f'Starting {len(jobs)} jobs')
            pool.map(f, jobs)
                
        except KeyboardInterrupt:
            print('Caught KeyboardInterrupt, terminating workers')
                
    print('Shut down complete')

Однако кажется, что если я повторяю запуск и остановку скрипта Python несколько раз, print(f'Stopping {self.text}') внутри app.stop() не выводит на стандартный вывод половину времени.

Вывод:

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete

$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete

person Nyxynyx    schedule 16.06.2021    source источник


Ответы (1)


Вот способ сделать это без возни с сигналами и без изменения класса App:

import asyncio
import multiprocessing
import os


class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)


def f(text):
    app = App(text)
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())


if __name__ == '__main__':
    jobs = ('foo', 'bar')
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
        try:
            pool.map(f, jobs)
        except KeyboardInterrupt:
            pool.close()
            pool.join()

Важно, чтобы мы ограничили количество процессов в пуле до min(len(jobs), os.cpu_count()), так как любые неназначенные рабочие процессы не будут находиться в блоке try-except, который перехватывает KeyboardInterrupt при нажатии ctrl-c, и поэтому они вызовут исключение.

Чтобы полностью избежать этой проблемы, вы можете предоставить пулу инициализатор, который игнорирует SIGINT, но это не позволит нам перехватить его и с KeyboardInterrupt. Я не уверен, как можно игнорировать это только в неинициализированных рабочих процессах.

Вы также можете создать экземпляры App в родительском процессе, если их можно замариновать для передачи через границу процесса в дочерние процессы.

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())


if __name__ == '__main__':
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
        try:
            pool.map(f, jobs)
        except KeyboardInterrupt:
            pool.close()
            pool.join()
person Will Da Silva    schedule 16.06.2021
comment
Спасибо! Я попробовал добавить несколько операторов print и обнаружил, что app.stop() не всегда вызывается. Исходный вопрос обновлен с более подробной информацией. - person Nyxynyx; 16.06.2021
comment
Это странно @Nyxynyx. Я протестировал его с оператором печати в stop и обнаружил, что он вызывается. Я сделаю больше тестов и посмотрю, что я могу найти. - person Will Da Silva; 16.06.2021
comment
@Nyxynyx Думаю, мне просто не повезло, когда я запускал его во время моих тестов ранее. Я смог воспроизвести проблему и исправил ее. Решение состоит в том, чтобы вызвать close, а затем join в пуле. - person Will Da Silva; 16.06.2021
comment
Круто теперь работает!!! Спасибо! - person Nyxynyx; 16.06.2021