Логика в коде состоит в том, чтобы извлекать данные через (асинхронный) HTTP-запрос, а затем создавать большой список словарей, в которых случайным образом генерируется одно из значений:
import asyncio
import random
import string
import time
from concurrent.futures import ProcessPoolExecutor
from itertools import cycle
from httpx import AsyncClient
URL = 'http://localhost:8080'
COUNT = 1_000_000
def rand_str(length=10):
return ''.join(random.choice(string.ascii_uppercase) for i in range(length))
def parser(data, count):
items = []
for _, item in zip(range(count), cycle(data)):
item['instance'] = rand_str()
items.append(item)
return items
async def parser_coro(data, count):
items = []
for _, item in zip(range(count), cycle(data)):
item['instance'] = rand_str()
items.append(item)
return items
async def run_in_executor(func, pool, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(pool, func, *args, **kwargs)
async def main():
async with AsyncClient(base_url=URL) as client:
r = await client.get('/api/alerts/')
data = r.json()
# Case 1
t1 = time.perf_counter()
parser(data, COUNT)
t2 = time.perf_counter()
print(f'Case 1 - sync: {t2 - t1:.3f}s')
# Case 2
t1 = time.perf_counter()
await parser_coro(data, COUNT)
t2 = time.perf_counter()
print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')
# Case 3
t1 = time.perf_counter()
await run_in_executor(parser, None, data, COUNT)
t2 = time.perf_counter()
print(f'Case 3 - thread executor: {t2 - t1:.3f}s')
# Case 4
t1 = time.perf_counter()
with ProcessPoolExecutor() as executor:
await run_in_executor(parser, executor, data, COUNT)
t2 = time.perf_counter()
print(f'Case 4 - process executor: {t2 - t1:.3f}s')
if __name__ == '__main__':
asyncio.run(main(), debug=True)
Тест:
$ python test.py
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s
Вопрос:
Следует ли мне запускать функцию parser
в исполнителе, чтобы она не блокировала основной поток во время создания списка, или в этом случае это не поможет? Действительно ли в данном случае это рабочая нагрузка, связанная с процессором или вводом-выводом? Я предполагаю, что ввода-вывода нет, но составляет список задач с интенсивным использованием ЦП, поэтому рабочая нагрузка связана с ЦП?
parser()
. Теперь, когда у вас работает только одна сопрограмма, не имеет значения, но это будет иметь значение, когда вы действительно попытаетесь запустить что-то параллельно. Если у вас есть возможность использоватьProcessPoolExecutor
, это даже лучше, потому что это позволит вам использовать несколько ядер. - person user4815162342   schedule 26.08.2020ThreadPoolExecutor
, чтобы он не блокировался, или это должно бытьProcessPoolExecutor
, поскольку это функция, связанная с процессором? - person HTF   schedule 26.08.2020ThreadPoolExecutor
. Просто если у вас есть несколько таких, работающих параллельно, все они будут использовать одно и то же ядро ЦП. (Но они не будут блокировать другие сопрограммы, потому что они будут запускаться из потока цикла событий.) - person user4815162342   schedule 26.08.2020