Python: concurrent.futures Как сделать его отменяемым?

Python concurrent.futures и ProcessPoolExecutor предоставляют удобный интерфейс для планирования и мониторинга задач. Futures даже предоставляет метод .cancel () :

cancel (): попытка отменить вызов. Если вызов в настоящее время выполняется и не может быть отменен, тогда метод вернет False, в противном случае вызов будет отменен и метод вернет True.

К сожалению, в аналогичном вопросе (относительно asyncio) ответ утверждает, что запущенные задачи нельзя отменить с помощью этого фрагмента документации, но в документации этого не говорится, только если они работают И не отменяются.

Отправка multiprocessing.Events в процессы также нетривиально возможна (это делается с помощью параметров, как в multiprocess.Process возвращает RuntimeError)

Что я пытаюсь сделать? Я хочу разбить область поиска и запустить задачу для каждого раздела. Но достаточно ОДНОГО решения, и процесс сильно нагружает процессор. Итак, есть ли действительно удобный способ добиться этого, который не компенсирует выгоды от использования ProcessPool для начала?

Пример:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))

person Ketzu    schedule 14.03.2017    source источник
comment
Вы можете попробовать установить Event в глобальную переменную вместо того, чтобы передавать ее в качестве параметра, см. stackoverflow.com/questions/1675766/   -  person niemmi    schedule 14.03.2017
comment
@niemmi, спасибо за совет. Я, вероятно, попробую это как обходной путь, так как он не очень хорошо спроектирован с вызовами разных модулей.   -  person Ketzu    schedule 15.03.2017
comment
Возможно, все это связано с тем, что API POSIX немедленной отмены тоже не существует: stackoverflow.com/questions/2084830/   -  person Ciro Santilli 新疆再教育营六四事件ۍ    schedule 14.12.2018


Ответы (3)


Я не знаю, почему concurrent.futures.Future не имеет метода .kill(), но вы можете выполнить то, что хотите, выключив пул процессов с помощью pool.shutdown(wait=False) и вручную убив оставшиеся дочерние процессы.

Создайте функцию для уничтожения дочерних процессов:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)

Запустите свой код, пока не получите первый результат, затем убейте все оставшиеся дочерние процессы:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())
person ostrokach    schedule 04.08.2017

К сожалению, запуск Futures нельзя отменить. Я считаю, что основная причина состоит в том, чтобы обеспечить одинаковый API в разных реализациях (невозможно прервать запущенные потоки или сопрограммы).

Библиотека Pebble была разработана для преодоления этого и других ограничений.

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
person noxdafox    schedule 08.08.2017
comment
Мне было полезно знать, что pebble фьючерсы наследуются от concurrent.futures фьючерсов. Следовательно, многие методы, предоставляемые concurrent.futures, также могут применяться к pebble фьючерсам, даже если pebble не реализует эти методы. Это применимо, например, для as_completed метода concurrent.futures. В результате переключиться на pebble может быть так же просто, как добавить импорт и изменить имена ProcessPoolExecuter и pool.submit. - person Samufi; 27.05.2020
comment
Это может быть очевидно, но я просто хотел отметить, что если вы используете ProcessPool, вы больше не используете несколько потоков, а скорее несколько процессов. Многих людей не волнует различие, но стоит хотя бы знать, что вы делаете. - person Stephen; 11.07.2020

Мне ваш вопрос показался интересным, и вот мой вывод.

Я обнаружил, что поведение метода .cancel() указано в документации Python. Что касается ваших запущенных параллельных функций, к сожалению, их нельзя было отменить даже после того, как им было сказано об этом. Если мой вывод верен, то я считаю, что Python действительно требует более эффективного метода .cancel ().

Запустите приведенный ниже код, чтобы проверить мои выводы.

from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time 

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 3351355150:
            return elem
            break #Added to terminate loop once found
    return False

start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    ### New Code: Start ### 
    for f in as_completed(futures):
        print(f.result())
        if f.result():
            print('break')
            break

    for f in futures:
        print(f, 'running?',f.running())
        if f.running():
            f.cancel()
            print('Cancelled? ',f.cancelled())

    print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )

Обновление: можно принудительно завершить параллельные процессы через bash, но в результате будет завершена и основная программа python. Если это не проблема, попробуйте следующий код.

Вы должны добавить приведенные ниже коды между двумя последними операторами печати, чтобы убедиться в этом сами. Примечание. Этот код работает только в том случае, если вы не запускаете какую-либо другую программу python3.

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
    print('PID = ', i)
    if i != result[0]:
        os.kill(int(i), signal.SIGKILL)
        try: 
           os.kill(int(i), 0)
           raise Exception("""wasn't able to kill the process 
                              HINT:use signal.SIGKILL or signal.SIGABORT""")
        except OSError as ex:
           continue
person Sun Bear    schedule 15.03.2017