Использование concurrent.futures, высокоуровневого интерфейса для асинхронного выполнения вызываемых объектов.

Прочитав эту статью, вы узнаете, как использовать библиотеку concurrent.futures для асинхронного выполнения задач в Python. Это лучшая альтернатива классам threading и multiprocessing в Python, поскольку в нем реализованы и Thread, и Process с одним и тем же интерфейсом, который определяется абстрактным классом Executor. Официальная документация обнаруживает одну серьезную проблему с Thread:

«В CPython из-за Global Interpreter Lock только один поток может одновременно выполнять код Python (хотя некоторые ориентированные на производительность библиотеки могут преодолеть это ограничение)».

Кроме того, класс threading не позволяет вам возвращать значение из вызываемых функций, кроме null. Основная концепция модуля concurrent.futures связана с классом Executor. Это абстрактный класс, который предоставляет методы для асинхронного выполнения вызовов. Вместо того, чтобы использовать его напрямую, мы будем использовать наследуемые от него подклассы:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

Перейдем к следующему разделу и начнем писать код Python.

1. ThreadPoolExecutor

Импортировать

Добавьте следующее объявление импорта в верхнюю часть файла Python:

from concurrent.futures import ThreadPoolExecutor
import time

Вызываемая функция (цель)

Давайте определим новую функцию, которая служит вызываемой функцией для асинхронного вызова. Я просто определю простую функцию, которая спит в течение двух секунд и после этого возвращает умножение обоих входных параметров:

def wait_function(x, y):
    print('Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print('Task(', x,'multiply', y, ') completed')
    return x * y

Отдельная задача

Следующим шагом будет создание объекта ThreadPoolExecutor. Настоятельно рекомендуется заключить его в with диспетчер контекста, поскольку он сам вызовет функцию shutdown и освободит ресурсы после завершения выполнения. Он принимает следующие входные параметры.

  • max_workers - количество рабочих для этого экземпляра. Начиная с версии 3.5, по умолчанию используется количество процессоров на машине, умноженное на пять. Начиная с версии 3.8, значение по умолчанию изменяется на min(32, os.cpu_count() + 4).
  • thread_name_prefix - позволяет пользователям управлять threading.Thread именами рабочих потоков, созданных пулом, для упрощения отладки.
  • initializer - необязательный вызываемый объект, который вызывается в начале каждого рабочего процесса.
  • initargs - набор аргументов, переданных initializer.

В этом руководстве я буду использовать только параметр max_workers. Давайте создадим ThreadPoolExecutor и вызовем submit функцию с wait_function в качестве вызываемой входной функции. Помните, что wait_function принимает два входных параметра. Я собираюсь передать их как отдельные параметры вместо кортежа:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(wait_function, 3, 4)

Функция submit вернет объект Future, который инкапсулирует асинхронное выполнение вызываемого объекта. Наиболее часто используемые функции для объекта Future:

  • cancel - Попытки отменить выполнение. Возвращает логическое значение, указывающее, был ли вызов успешно отменен.
  • running - Проверяет, выполняется ли вызов. Возвращает логическое значение.
  • done - Проверяет, был ли вызов отменен или завершен. Возвращает логическое значение.
  • result - возвращает значение, возвращаемое вызовом. Если вызов еще не завершен, этот метод будет ждать n секунд, заданных параметром input timeout. Настоятельно рекомендуется проверить использование функции done перед вызовом результата, поскольку timeout заблокирует текущее выполнение.
  • add_done_callback - присоединяет вызываемую функцию к объекту Future. Эта функция будет вызываться с Future в качестве единственного аргумента, когда Future отменяется или завершается.

Добавьте следующий код прямо под функцией submit. Это простой цикл, который печатает строку во время работы потока. Когда он будет завершен, он распечатает результат:

Ознакомьтесь с полным кодом на GitHub:

При запуске файла Python вы должны увидеть следующий результат:

Несколько задач

Затем мы собираемся добавить к нему еще одну задачу, чтобы обе они работали параллельно. Измените код в вашем файле Python на следующий:

А пока сначала установите max_workers на единицу. Запустите его, и вы должны заметить, что задачи не выполняются параллельно. Он запустит первую задачу, а затем вторую задачу. Это в основном потому, что у вас в пуле только один воркер. Давайте увеличим max_workers до двух, и вы увидите, что обе задачи выполняются параллельно.

Функция обратного вызова

Вы можете прикрепить функцию обратного вызова к объекту Future. Он вызовет присоединенную функцию после отмены или завершения выполнения. Это чрезвычайно полезно, если вы намереваетесь продолжить обновление пользовательского интерфейса после успешного подключения к базе данных или завершения URL-запросов. А пока давайте создадим простую функцию обратного вызова:

def callback_function(future):
    print('Callback with the following result', future.result())

Добавьте следующий код прямо под функцией submit:

future.add_done_callback(callback_function)

Ознакомьтесь с полным кодом на GitHub:

При запуске файла Python в консоли будет показан следующий результат:

2. ProcessPoolExecutor

Класс ProcessPoolExecutor работает точно так же, как ThreadPoolExecutor, но с некоторыми небольшими отличиями. Он использует модуль multiprocessing, который позволяет ему обойти Global Interpreter Lock. Однако это также означает, что только выбираемые объекты могут быть выполнены и возвращены.

Кроме того, он не работает в интерактивном интерпретаторе и должен иметь __main__ функцию, которую могут импортировать рабочие подпроцессы. max_workers - количество процессов в машине. В операционной системе Windows max_workers должно быть равно 61 или меньше.

Вы должны импортировать ProcessPoolExecutor, чтобы использовать его:

from concurrent.futures import ProcessPoolExecutor

Вы можете повторно использовать предыдущий код и изменить его на ProcessPoolExecutor вместо ThreadPoolExecutor. Оберните код внутри функции и вызовите ее прямо из __main__. Ознакомьтесь с полным кодом на GitHub:

3. Прерывание клавиатуры

Если вы намереваетесь остановить выполнение с помощью Ctrl+C, пока процесс выполняется в потоке, компилятор, скорее всего, зависнет и застрянет на KeyboardInterupt исключении. В основном это связано с тем, что команда Ctrl+C генерирует SIGINT, который не останавливает или прерывает выполнение. Вам нужно сгенерировать SIGBREAK, чтобы завершить выполнение и вернуться в терминал. Используйте следующую команду для создания SIGBREAK в зависимости от операционной системы и модели компьютера:

4. Вывод

Подведем итоги тому, что мы узнали сегодня.

Мы начали с простого объяснения модуля concurrent.futures.

После этого мы подробно изучили базовый класс ThreadPoolExecutor и класс Future. Мы пробовали запускать несколько задач параллельно с другим количеством max_workers. Мы также протестировали настройку функции обратного вызова, которая будет выполняться после завершения задачи.

Мы перешли к ProcessPoolExecutor, который похож на ThreadPoolExecutor с небольшими отличиями.

Спасибо, что прочитали эту статью. Надеюсь увидеть вас снова в следующей статье!

использованная литература

  1. Concurrent.futures документация
  2. Потоковая документация
  3. Ярлык для сигнала BREAK