Легкая настойчивость в контексте ThreadPoolExecutor в Python

У меня есть код Python, который выполняет дорогостоящие задания с помощью ThreadPoolExecutor, и я хотел бы отслеживать, какие из них были завершены, чтобы, если мне придется перезапустить эту систему, мне не пришлось переделывать то, что уже было выполнено. законченный. В однопоточном контексте я мог бы просто отметить то, что я сделал, на полке. Вот наивный перенос этой идеи в многопоточную среду:

from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve


def do_thing(done, x):
    # Don't let the command run in the background; we want to be able to tell when it's done
    _ = subprocess.check_output(["some_expensive_command", x])
    done[x] = True


futs = []
with shelve.open("done") as done:
    with ThreadPoolExecutor(max_workers=18) as executor:
        for x in things_to_do:
            if done.get(x, False):
                continue
            futs.append(executor.submit(do_thing, done, x))
            # Can't run `done[x] = True` here--have to wait until do_thing finishes
        for future in futs:
            future.result()

    # Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
    # before we get through all of things_to_do

Могу ли я уйти с этим? документация для полки не содержит никаких гарантий безопасности потоков, поэтому я думаю нет.

Итак, каков простой способ справиться с этим? Я думал, что, возможно, это поможет вставить done[x] = True в future.add_done_callback, но same-thread/26021772#26021772">который часто будет выполняться в том же потоке, что и будущее. Возможно, есть механизм блокировки, который прекрасно работает с ThreadPoolExecutor? Мне это кажется чище, чем написание цикла, который спит, а затем проверяет завершенные фьючерсы.


person kuzzooroo    schedule 22.08.2016    source источник


Ответы (1)


Пока вы все еще находитесь в самом внешнем диспетчере контекста with, полка done — это просто обычный объект Python — он записывается на диск только тогда, когда менеджер контекста закрывается и запускает свой метод __exit__. Поэтому он так же безопасен для потоков, как и любой другой объект Python, благодаря GIL (пока вы используете CPython).

В частности, переназначение done[x] = True является потокобезопасным / будет выполняться атомарно.

Важно отметить, что хотя метод полки __exit__ запустится после нажатия Ctrl-C, этого не произойдет, если процесс python завершится внезапно, и полка не будет сохранена на диск.

Чтобы защититься от такого рода сбоев, я бы предложил использовать облегченную файловую базу данных, безопасную для потоков, например sqllite3.

person Julien    schedule 27.09.2016
comment
__exit__ диспетчера контекста вызывается даже в случае, например, исключения KeyboardInterrupt, поэтому мое состояние, похоже, сохраняется. Что касается безопасности потоков, вы говорите, что все объекты Python являются потокобезопасными из-за GIL? - person kuzzooroo; 01.10.2016
comment
Не все объекты python в целом потокобезопасны, но (по крайней мере, в CPython) вы используете базовые атомарные операции (назначение/переназначение), которые не включают ввод-вывод (запись происходит после __exit__), поэтому вы будете Безопасно. - person Julien; 01.10.2016
comment
Я хочу добавить, что ваш код немного неверен - вы не должны вызывать do_thing, а должны передавать его в качестве первого аргумента. Кроме того, вы должны сохранить возвращаемое значение executor.submit в список (обычно называемый futs). Затем в контексте ThreadPoolExecutor выполните цикл по списку, вызывая метод result() каждого объекта. Это заблокирует интерпретатор от продолжения, пока все задачи не будут выполнены. - person Julien; 01.10.2016
comment
Спасибо. Я сделал исправление передачи do_thing в качестве аргумента executor.submit вместо его вызова. Нужно ли мне вызывать метод result() каждого будущего, если я использую диспетчер контекста? В документации для Executor.shutdown() говорится: Вы можете избежать явного вызова этого метода, если используете оператор with, который отключит Executor (ожидание, как если бы Executor.shutdown() был вызван с wait установленным на True). - person kuzzooroo; 02.10.2016
comment
Вы должны вызывать метод result() каждого будущего, несмотря ни на что, потому что в противном случае он проглотит все исключения. Если при запуске отправленной функции возникло исключение, оно возникнет, когда вы получите результат от соответствующего будущего объекта. В противном случае он тихо выйдет из строя, и вы никогда не узнаете, что что-то пошло не так. - person Julien; 03.10.2016
comment
ОК, отредактировал код, теперь я вызываю result в каждом будущем. - person kuzzooroo; 03.10.2016
comment
Я думаю, что ключевой момент, который есть в комментариях, но еще не в ответе, заключается в том, что done[x] = True специально является потокобезопасным. Ответ прямо сейчас просто говорит, что это безопасно, потому что это объект Python, но я думаю, что это слишком широко, например, вы не можете перебирать dict в одном потоке и изменять его ключи в другом. - person kuzzooroo; 03.10.2016