AsyncIO - относительно новая структура для достижения параллелизма в Python. В этой статье я сравню его с традиционными методами, такими как многопоточность и многопроцессорность. Прежде чем перейти к примерам, я добавлю несколько обновлений о параллелизме в python.

  • CPython применяет GIL (глобальную блокировку интерпретатора), которая не позволяет в полной мере использовать многопоточность. Каждый поток должен получить эту взаимоисключающую блокировку перед запуском любого байт-кода.
  • Многопоточность обычно предпочтительна для сетевого ввода-вывода или дискового ввода-вывода, поскольку потокам не нужно жестко конкурировать между собой за получение GIL.
  • Многопроцессорность обычно предпочтительна для задач с интенсивным использованием ЦП. Для многопроцессорной обработки не требуется GIL, поскольку у каждого процесса есть свое состояние, однако создание и уничтожение процессов нетривиально.
  • Многопоточность с модулем потоковой передачи является вытесняющей, что влечет за собой добровольную и непроизвольную замену потоков.
  • AsyncIO - это однопоточная кооперативная многозадачность одного процесса. Задача asyncio имеет исключительное использование ЦП до тех пор, пока она не захочет передать ее координатору или циклу обработки событий. (Терминология будет рассмотрена позже)

Практические примеры

Сообщения о задержке

Программа задерживает печать сообщения. Пока MainThread спит, ЦП простаивает, что является плохим использованием ресурсов.

12:39:00:MainThread:Main started
12:39:00:MainThread:TWO received
12:39:02:MainThread:Printing TWO
12:39:02:MainThread:THREE received
12:39:05:MainThread:Printing THREE
12:39:05:MainThread:Main Ended

Параллелизм с потоковой передачей

Использование модуля потоковой передачи Python для выполнения нескольких вызовов delay_message в отдельных потоках, не являющихся демонами . Неудивительно, что программа выполняется быстрее, чем синхронная версия выше на две секунды. ОС меняет местами потоки, когда поток простаивает (спит). Вы можете связать сон с выполнением системного вызова для связи с внешней средой.

12:39:05:MainThread:Main started
12:39:05:Thread-4:TWO received
12:39:05:Thread-5:THREE received
12:39:07:Thread-4:Printing TWO
12:39:08:Thread-5:Printing THREE
12:39:08:MainThread:Main Ended

Пул потоков

Хотя потоки легковесны, создание и уничтожение большого количества потоков обходится дорого. concurrent.futures построен на основе модуля потоковой передачи и предоставляет удобный интерфейс для создания пула потоков и пула процессов. Вместо создания нового потока для вызова функции он повторно использует существующий поток в пуле.

10:42:36:ThreadPoolExecutor-0_0:TWO received
10:42:36:ThreadPoolExecutor-0_1:THREE received
10:42:38:ThreadPoolExecutor-0_0:Printing TWO
10:42:38:MainThread:TWO Done
10:42:39:ThreadPoolExecutor-0_1:Printing THREE
10:42:39:MainThread:THREE Done

Параллелизм с AsyncIO

  1. Сопрограмма: в отличие от обычной функции с единственной точкой выхода, сопрограмма может приостанавливать и возобновлять свое выполнение. Создать сопрограмму так же просто, как использовать ключевое слово async перед объявлением функции.
  2. Цикл событий или координатор: сопрограмма, которая управляет другими сопрограммами. Вы можете думать об этом как о планировщике или мастере.
  3. Ожидаемый объект Coroutine, Tasks и Future - это ожидаемые объекты. Сопрограмма может ожидать на ожидающих объектах. Пока сопрограмма ожидает ожидаемого объекта, ее выполнение временно приостанавливается и возобновляется после завершения Future.
07:35:32:MainThread:Main started
07:35:32:MainThread:Current registered tasks: 1
07:35:32:MainThread:Creating tasks
07:35:32:MainThread:Current registered tasks: 3
07:35:32:MainThread:TWO received
07:35:32:MainThread:THREE received
07:35:34:MainThread:Printing TWO
07:35:35:MainThread:Printing THREE
07:35:35:MainThread:Main Ended

Хотя программа выполняется в одном потоке, она может достичь того же уровня производительности, что и многопоточный код, за счет совместной многозадачности.

Лучший способ создания задач AsyncIO

Использование asyncio.gather для создания нескольких задач за один раз.

08:09:20:MainThread:Main started
08:09:20:MainThread:ONE received
08:09:20:MainThread:TWO received
08:09:20:MainThread:THREE received
08:09:20:MainThread:FOUR received
08:09:20:MainThread:FIVE received
08:09:21:MainThread:Printing ONE
08:09:22:MainThread:Printing TWO
08:09:23:MainThread:Printing THREE
08:09:24:MainThread:Printing FOUR
08:09:25:MainThread:Printing FIVE
08:09:25:MainThread:Main Ended

Предупреждение о блокировке вызовов в задачах AsyncIO

Как я уже говорил ранее, задача asyncio имеет исключительное право на использование ЦП, пока она не откажется от нее добровольно. Если по ошибке в вашу задачу проникнет блокирующий вызов, он остановит выполнение программы.

11:07:31:MainThread:Main started
11:07:31:MainThread:Creating multiple tasks with asyncio.gather
11:07:31:MainThread:ONE received
11:07:31:MainThread:TWO received
11:07:31:MainThread:THREE received
11:07:34:MainThread:Printing THREE
11:07:34:MainThread:FOUR received
11:07:34:MainThread:FIVE received
11:07:34:MainThread:Printing ONE
11:07:34:MainThread:Printing TWO
11:07:38:MainThread:Printing FOUR
11:07:39:MainThread:Printing FIVE
11:07:39:MainThread:Main Ended

Когда delay_message получает сообщение THREE, он выполняет блокирующий вызов и не передает управление циклу обработки событий до тех пор, пока не завершит задачу, тем самым замедляя прогресс выполнения. Следовательно, это занимает три секунды больше, чем предыдущий запуск. Хотя этот пример кажется адаптированным, это может случиться, если вы не будете осторожны. С другой стороны, потоки являются вытесняющими, при этом ОС автоматически переключает поток, если он ожидает блокирующего вызова.

Условия гонки

Многопоточный код может быстро развалиться, если он не учитывает условия гонки. Это особенно усложняется при использовании внешних библиотек, поскольку нам нужно проверить, поддерживают ли они многопоточный код. Например, модуль популярных запросов session объект не является потокобезопасным. Следовательно, попытка распараллелить сетевые запросы с помощью объекта session может привести к непредвиденным результатам.

20:28:15:ThreadPoolExecutor-0_0:Update Started
20:28:15:ThreadPoolExecutor-0_0:Sleeping
20:28:15:ThreadPoolExecutor-0_1:Update Started
20:28:15:ThreadPoolExecutor-0_1:Sleeping
20:28:17:ThreadPoolExecutor-0_0:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_1:Reading Value From Db
20:28:17:ThreadPoolExecutor-0_0:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Updating Value
20:28:17:ThreadPoolExecutor-0_1:Update Finished
20:28:17:ThreadPoolExecutor-0_0:Update Finished
20:28:17:MainThread:Final value is 1

Конечное значение в идеале должно быть 2. Однако из-за вытесняющей перестановки потоков thread-0 менялись местами перед обновлением значения, поэтому updates ошибочно выдавали конечное значение как 1. Мы должны использовать блокировки, чтобы этого не произошло.

21:02:45:ThreadPoolExecutor-0_0:Update Started
21:02:45:ThreadPoolExecutor-0_0:Sleeping
21:02:45:ThreadPoolExecutor-0_1:Update Started
21:02:45:ThreadPoolExecutor-0_1:Sleeping
21:02:47:ThreadPoolExecutor-0_0:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_0:Updating Value
21:02:47:ThreadPoolExecutor-0_0:Update Finished
21:02:47:ThreadPoolExecutor-0_1:Reading Value From Db
21:02:47:ThreadPoolExecutor-0_1:Updating Value
21:02:47:ThreadPoolExecutor-0_1:Update Finished
21:02:47:MainThread:Final value is 2

Условия гонки редки с AsyncIO

Поскольку задача имеет полный контроль над тем, когда приостанавливать выполнение, условия гонки с asyncio возникают редко.

20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:49:MainThread:Update Started
20:35:49:MainThread:Sleeping
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Reading Value From Db
20:35:50:MainThread:Updating Value
20:35:50:MainThread:Update Finished
20:35:50:MainThread:Final value is 2

Как видите, как только задача возобновилась после sleeping, она не теряла контроль до тех пор, пока не завершила выполнение сопрограммы. При потоковой передаче замена потоков не очень очевидна, но с помощью asyncio мы можем контролировать, когда именно выполнение сопрограммы должно быть приостановлено. Тем не менее, когда две сопрограммы заходят в тупик, может произойти сбой.

Многопроцессорность

Как упоминалось выше, многопроцессорность очень удобна при реализации программ, интенсивно использующих ЦП. Код ниже выполняет сортировку слиянием для 1000lists с 30000 элементами. Потерпите меня, если приведенная ниже реализация сортировки слиянием немного неуклюжа.

Синхронная версия

21:24:07:MainThread:Starting Sorting
21:26:10:MainThread:Sorting Completed

Асинхронная версия

21:29:33:MainThread:Starting Sorting
21:30:03:MainThread:Sorting Completed

По умолчанию количество процессов равно количеству процессоров на машине. Вы можете наблюдать значительное улучшение времени выполнения между двумя версиями.

Я буду писать PART 2 этой статьи об асинхронных сетевых запросах и чтениях файлов. Если вы обнаружите какие-либо ошибки в коде, пожалуйста, оставьте комментарий или свяжитесь со мной в LinkedIn.