Использование concurrent.futures, высокоуровневого интерфейса для асинхронного выполнения вызываемых объектов.
Прочитав эту статью, вы узнаете, как использовать библиотеку concurrent.futures
для асинхронного выполнения задач в Python. Это лучшая альтернатива классам threading
и multiprocessing
в Python, поскольку в нем реализованы и Thread
, и Process
с одним и тем же интерфейсом, который определяется абстрактным классом Executor
. Официальная документация обнаруживает одну серьезную проблему с Thread
:
«В CPython из-за
Global Interpreter Lock
только один поток может одновременно выполнять код Python (хотя некоторые ориентированные на производительность библиотеки могут преодолеть это ограничение)».
Кроме того, класс threading
не позволяет вам возвращать значение из вызываемых функций, кроме null
. Основная концепция модуля concurrent.futures
связана с классом Executor
. Это абстрактный класс, который предоставляет методы для асинхронного выполнения вызовов. Вместо того, чтобы использовать его напрямую, мы будем использовать наследуемые от него подклассы:
ThreadPoolExecutor
ProcessPoolExecutor
Перейдем к следующему разделу и начнем писать код Python.
1. ThreadPoolExecutor
Импортировать
Добавьте следующее объявление импорта в верхнюю часть файла Python:
from concurrent.futures import ThreadPoolExecutor import time
Вызываемая функция (цель)
Давайте определим новую функцию, которая служит вызываемой функцией для асинхронного вызова. Я просто определю простую функцию, которая спит в течение двух секунд и после этого возвращает умножение обоих входных параметров:
def wait_function(x, y): print('Task(', x,'multiply', y, ') started') time.sleep(2) print('Task(', x,'multiply', y, ') completed') return x * y
Отдельная задача
Следующим шагом будет создание объекта ThreadPoolExecutor
. Настоятельно рекомендуется заключить его в with
диспетчер контекста, поскольку он сам вызовет функцию shutdown
и освободит ресурсы после завершения выполнения. Он принимает следующие входные параметры.
max_workers
- количество рабочих для этого экземпляра. Начиная с версии 3.5, по умолчанию используется количество процессоров на машине, умноженное на пять. Начиная с версии 3.8, значение по умолчанию изменяется наmin(32, os.cpu_count() + 4)
.thread_name_prefix
- позволяет пользователям управлятьthreading.Thread
именами рабочих потоков, созданных пулом, для упрощения отладки.initializer
- необязательный вызываемый объект, который вызывается в начале каждого рабочего процесса.initargs
- набор аргументов, переданныхinitializer
.
В этом руководстве я буду использовать только параметр max_workers
. Давайте создадим ThreadPoolExecutor
и вызовем submit
функцию с wait_function
в качестве вызываемой входной функции. Помните, что wait_function
принимает два входных параметра. Я собираюсь передать их как отдельные параметры вместо кортежа:
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(wait_function, 3, 4)
Функция submit
вернет объект Future
, который инкапсулирует асинхронное выполнение вызываемого объекта. Наиболее часто используемые функции для объекта Future
:
cancel
- Попытки отменить выполнение. Возвращает логическое значение, указывающее, был ли вызов успешно отменен.running
- Проверяет, выполняется ли вызов. Возвращает логическое значение.done
- Проверяет, был ли вызов отменен или завершен. Возвращает логическое значение.result
- возвращает значение, возвращаемое вызовом. Если вызов еще не завершен, этот метод будет ждать n секунд, заданных параметром inputtimeout
. Настоятельно рекомендуется проверить использование функцииdone
перед вызовом результата, посколькуtimeout
заблокирует текущее выполнение.add_done_callback
- присоединяет вызываемую функцию к объектуFuture
. Эта функция будет вызываться сFuture
в качестве единственного аргумента, когдаFuture
отменяется или завершается.
Добавьте следующий код прямо под функцией submit
. Это простой цикл, который печатает строку во время работы потока. Когда он будет завершен, он распечатает результат:
Ознакомьтесь с полным кодом на GitHub:
При запуске файла Python вы должны увидеть следующий результат:
Несколько задач
Затем мы собираемся добавить к нему еще одну задачу, чтобы обе они работали параллельно. Измените код в вашем файле Python на следующий:
А пока сначала установите max_workers
на единицу. Запустите его, и вы должны заметить, что задачи не выполняются параллельно. Он запустит первую задачу, а затем вторую задачу. Это в основном потому, что у вас в пуле только один воркер. Давайте увеличим max_workers
до двух, и вы увидите, что обе задачи выполняются параллельно.
Функция обратного вызова
Вы можете прикрепить функцию обратного вызова к объекту Future
. Он вызовет присоединенную функцию после отмены или завершения выполнения. Это чрезвычайно полезно, если вы намереваетесь продолжить обновление пользовательского интерфейса после успешного подключения к базе данных или завершения URL-запросов. А пока давайте создадим простую функцию обратного вызова:
def callback_function(future): print('Callback with the following result', future.result())
Добавьте следующий код прямо под функцией submit
:
future.add_done_callback(callback_function)
Ознакомьтесь с полным кодом на GitHub:
При запуске файла Python в консоли будет показан следующий результат:
2. ProcessPoolExecutor
Класс ProcessPoolExecutor
работает точно так же, как ThreadPoolExecutor
, но с некоторыми небольшими отличиями. Он использует модуль multiprocessing
, который позволяет ему обойти Global Interpreter Lock
. Однако это также означает, что только выбираемые объекты могут быть выполнены и возвращены.
Кроме того, он не работает в интерактивном интерпретаторе и должен иметь __main__
функцию, которую могут импортировать рабочие подпроцессы. max_workers
- количество процессов в машине. В операционной системе Windows max_workers
должно быть равно 61 или меньше.
Вы должны импортировать ProcessPoolExecutor
, чтобы использовать его:
from concurrent.futures import ProcessPoolExecutor
Вы можете повторно использовать предыдущий код и изменить его на ProcessPoolExecutor
вместо ThreadPoolExecutor
. Оберните код внутри функции и вызовите ее прямо из __main__
. Ознакомьтесь с полным кодом на GitHub:
3. Прерывание клавиатуры
Если вы намереваетесь остановить выполнение с помощью Ctrl+C
, пока процесс выполняется в потоке, компилятор, скорее всего, зависнет и застрянет на KeyboardInterupt
исключении. В основном это связано с тем, что команда Ctrl+C
генерирует SIGINT
, который не останавливает или прерывает выполнение. Вам нужно сгенерировать SIGBREAK
, чтобы завершить выполнение и вернуться в терминал. Используйте следующую команду для создания SIGBREAK
в зависимости от операционной системы и модели компьютера:
4. Вывод
Подведем итоги тому, что мы узнали сегодня.
Мы начали с простого объяснения модуля concurrent.futures
.
После этого мы подробно изучили базовый класс ThreadPoolExecutor
и класс Future
. Мы пробовали запускать несколько задач параллельно с другим количеством max_workers
. Мы также протестировали настройку функции обратного вызова, которая будет выполняться после завершения задачи.
Мы перешли к ProcessPoolExecutor
, который похож на ThreadPoolExecutor
с небольшими отличиями.
Спасибо, что прочитали эту статью. Надеюсь увидеть вас снова в следующей статье!