Запустить неасинхронную функцию, которая просто создает большой список в исполнителе?

Логика в коде состоит в том, чтобы извлекать данные через (асинхронный) HTTP-запрос, а затем создавать большой список словарей, в которых случайным образом генерируется одно из значений:

import asyncio
import random
import string
import time

from concurrent.futures import ProcessPoolExecutor
from itertools import cycle

from httpx import AsyncClient

URL = 'http://localhost:8080'
COUNT = 1_000_000


def rand_str(length=10):
    return ''.join(random.choice(string.ascii_uppercase) for i in range(length))


def parser(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def parser_coro(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def run_in_executor(func, pool, *args, **kwargs):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(pool, func, *args, **kwargs)


async def main():
    async with AsyncClient(base_url=URL) as client:
        r = await client.get('/api/alerts/')
        data = r.json()

    # Case 1
    t1 = time.perf_counter()
    parser(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 1 - sync: {t2 - t1:.3f}s')
    
    # Case 2
    t1 = time.perf_counter()
    await parser_coro(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')

    # Case 3
    t1 = time.perf_counter()
    await run_in_executor(parser, None, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 3 - thread executor: {t2 - t1:.3f}s')

    # Case 4
    t1 = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        await run_in_executor(parser, executor, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 4 - process executor: {t2 - t1:.3f}s')


if __name__ == '__main__':
    asyncio.run(main(), debug=True)

Тест:

$ python test.py 
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s

Вопрос:

Следует ли мне запускать функцию parser в исполнителе, чтобы она не блокировала основной поток во время создания списка, или в этом случае это не поможет? Действительно ли в данном случае это рабочая нагрузка, связанная с процессором или вводом-выводом? Я предполагаю, что ввода-вывода нет, но составляет список задач с интенсивным использованием ЦП, поэтому рабочая нагрузка связана с ЦП?


person HTF    schedule 26.08.2020    source источник
comment
Ответ - да, вы должны запустить его в исполнителе, и да, это функция, связанная с процессором. Если вы не запустите ее в исполнителе, никакая другая сопрограмма не сможет работать, пока выполняется parser(). Теперь, когда у вас работает только одна сопрограмма, не имеет значения, но это будет иметь значение, когда вы действительно попытаетесь запустить что-то параллельно. Если у вас есть возможность использовать ProcessPoolExecutor, это даже лучше, потому что это позволит вам использовать несколько ядер.   -  person user4815162342    schedule 26.08.2020
comment
@ user4815162342 спасибо за комментарий. Могу ли я запустить это в ThreadPoolExecutor, чтобы он не блокировался, или это должно быть ProcessPoolExecutor, поскольку это функция, связанная с процессором?   -  person HTF    schedule 26.08.2020
comment
Конечно, вы можете запустить его в ThreadPoolExecutor. Просто если у вас есть несколько таких, работающих параллельно, все они будут использовать одно и то же ядро ​​ЦП. (Но они не будут блокировать другие сопрограммы, потому что они будут запускаться из потока цикла событий.)   -  person user4815162342    schedule 26.08.2020


Ответы (1)


Должен ли я запускать функцию parser в исполнителе, чтобы она не блокировала основной поток во время создания списка, или в этом случае это не поможет?

Да, ты должен. Несмотря на глобальную блокировку интерпретатора, использование отдельного потока поможет, потому что Python позволит исполнению переключиться с синтаксического анализа на поток asyncio без ведома parser. Таким образом, использование потока предотвратит блокировку цикла событий на 6 секунд или сколько времени потребуется для запуска функции.

Обратите внимание, что вариант parser_coro ничем не отличается от варианта parser без исполнителя, потому что он ничего не ожидает. await parser_coro(...) остановит цикл вентиляции так же, как вызов parser(...) без исполнителя.

Действительно ли в данном случае это рабочая нагрузка, связанная с процессором или вводом-выводом?

Я не могу комментировать остальную рабочую нагрузку, но написанная функция определенно связана с процессором.

Могу ли я запустить это в ThreadPoolExecutor, чтобы он не блокировался, или это должно быть ProcessPoolExecutor, поскольку это функция, связанная с процессором?

Конечно, вы можете запустить его в ThreadPoolExecutor. Просто если у вас есть несколько таких, работающих параллельно, все они будут использовать одно и то же ядро ​​ЦП. (Но они не будут блокировать другие сопрограммы, потому что они будут запускаться из потока цикла событий.)

person user4815162342    schedule 26.08.2020