Обнаружение неудачных задач в concurrent.futures

Я использую concurrent.futures, так как он имеет простой интерфейс и позволяет пользователю легко контролировать максимальное количество потоков/процессов. Однако похоже, что concurrent.futures скрывает неудачные задачи и продолжает основной поток после завершения/неудачной работы всех задач.

import concurrent.futures

def f(i):
    return (i + 's')

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    fs = [executor.submit(f, i ) for i in range(10)]
    concurrent.futures.wait(fs)

Вызов f для любого целого числа приводит к TypeError. Однако весь скрипт работает нормально и завершается с кодом 0. Есть ли способ заставить его выдавать исключение/ошибку при сбое любого потока?

Или есть лучший способ ограничить количество потоков/процессов без использования concurrent.futures?


comment
связанные: stackoverflow.com/questions/33448329/   -  person Ciro Santilli 新疆再教育营六四事件ۍ    schedule 20.03.2019


Ответы (2)


concurrent.futures.wait гарантирует выполнение всех задач, но не проверяет успех (что-то return-ed) по сравнению с ошибкой (исключение возникло и не перехвачено в рабочей функции). Для этого вам нужно вызвать .result() для каждого Future (что приведет либо к повторному raise исключению из задачи, либо к созданию значения return-ed). Есть и другие методы проверки без фактического рейза в основном потоке (например, .exception()), но .result() — самый простой метод.

Если вы хотите сделать это повторно raise, самый простой подход — просто заменить вызов wait() на:

for fut in concurrent.futures.as_completed(fs):
    fut.result()

который будет обрабатывать результаты как завершенные Futureс, и сразу же raise и Exception, если они произошли. В качестве альтернативы вы продолжаете использовать wait, поэтому все задачи завершаются до того, как вы проверите исключения для любой из них, затем перебираете fs напрямую и вызываете .result() для каждой.

person ShadowRanger    schedule 01.03.2016

Есть еще один способ сделать то же самое с multiprocessing.Pool (для процессов) или multiprocessing.pool.ThreadPool (для потоков). Насколько я знаю, он повторно выдает любые пойманные исключения.

person Anton    schedule 01.03.2016
comment
Этот вопрос касается concurrent.futures AFAIK, это более современная реализация модуля многопроцессорности. Для меня нет смысла предлагать использовать старую библиотеку. - person guettli; 13.06.2016
comment
@guettli, эта старая библиотека до сих пор активно используется многими параллельными библиотеками, например. Джоблиб и Даск. И фьючерсы просто функционально не эквивалентны пулу с его функцией map, которая потенциально более эффективна, чем ожидание нескольких будущих объектов. - person Anton; 13.06.2016
comment
Да, многопроцессорность активно используется многими библиотеками. Но заголовок этого вопроса: Обнаружение неудачных задач в concurrent.futures - person guettli; 14.06.2016
comment
Да. Но значит ли это, что мы не можем предложить никаких альтернатив тому, как думает ОП? Иногда нет другого способа ответить на вопрос, кроме как перенаправить на что-то другое. - person Anton; 14.06.2016
comment
AFAIK concurrent.futures более современен. Я знаю, что многопроцессорность не устарела, но я бы не советовал возвращаться. Конечно, перенаправление на что-то другое иногда очень полезно. - person guettli; 15.06.2016