multiprocessing.Pool: В чем разница между map_async и imap?

Я пытаюсь научиться использовать пакет Python multiprocessing, но не понимаю разницы между map_async и imap. Я заметил, что и map_async, и imap выполняются асинхронно. Итак, когда я должен использовать один над другим? И как мне получить результат, возвращаемый map_async?

Должен ли я использовать что-то вроде этого?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i

person spacegoing    schedule 23.10.2014    source источник


Ответы (2)


Между imap/imap_unordered и map/map_async есть два ключевых различия:

  1. То, как они потребляют итерацию, которую вы им передаете.
  2. То, как они возвращают вам результат.

map потребляет ваш итерируемый объект, преобразуя его в список (при условии, что это уже не список), разбивая его на фрагменты и отправляя эти фрагменты рабочим процессам в Pool. Разбиение итерации на куски работает лучше, чем передача каждого элемента в итерации между процессами по одному элементу за раз, особенно если итерация большая. Однако превращение итерируемого объекта в список для его фрагментации может потребовать очень больших затрат памяти, поскольку весь список необходимо будет хранить в памяти.

imap не превращает итерацию, которую вы ему даете, в список и не разбивает его на куски (по умолчанию). Он будет выполнять итерацию по одному элементу за раз и отправлять их каждому рабочему процессу. Это означает, что вы не тратите память на преобразование всей итерации в список, но это также означает, что производительность для больших итераций ниже из-за отсутствия фрагментации. Однако это можно смягчить, передав аргумент chunksize больше, чем значение по умолчанию, равное 1.

Другое существенное различие между imap/imap_unordered и map/map_async заключается в том, что с imap/imap_unordered вы можете начать получать результаты от воркеров, как только они будут готовы, вместо того, чтобы ждать, пока все они будут завершены. С map_async сразу же возвращается AsyncResult, но вы не можете фактически получить результаты из этого объекта, пока все они не будут обработаны, после чего он возвращает тот же список, что и map (map на самом деле реализован внутренне как map_async(...).get()). Невозможно получить частичные результаты; у вас либо есть весь результат, либо ничего.

imap и imap_unordered сразу же возвращают итерации. С imap результаты будут получены из итерируемого объекта, как только они будут готовы, сохраняя при этом порядок входного итерируемого объекта. С imap_unordered результаты будут получены, как только они будут готовы, независимо от порядка входных итераций. Итак, скажем, у вас есть это:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

Это выведет:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

Если вы используете p.imap_unordered вместо p.imap, вы увидите:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

Если вы используете p.map или p.map_async().get(), вы увидите:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

Итак, основными причинами использования imap/imap_unordered вместо map_async являются:

  1. Ваша итерация достаточно велика, поэтому преобразование ее в список приведет к тому, что у вас закончится/используется слишком много памяти.
  2. Вы хотите иметь возможность начать обработку результатов до того, как все будут завершены.
person dano    schedule 23.10.2014
comment
как насчет применения и применения_асинхронности? - person Harsh Daftary; 17.06.2015
comment
@HarshDaftary apply отправляет одну задачу рабочему процессу, а затем блокирует ее до завершения. apply_async отправляет одну задачу рабочему процессу, а затем немедленно возвращает объект AsyncResult, который можно использовать для ожидания завершения задачи и получения результата. apply реализуется простым вызовом apply_async(...).get() - person dano; 17.06.2015
comment
Такое описание должно быть в официальной Pool документации, а не существующий унылый. - person mins; 21.12.2017
comment
@dano Я хочу запустить функцию в фоновом режиме, но у меня есть некоторые ограничения ресурсов, и я не могу запускать функцию столько раз, сколько хочу, и хочу поставить в очередь дополнительные выполнения функции. У тебя есть идеи, как мне это сделать? У меня есть вопрос здесь< /а>. Не могли бы вы взглянуть на мой вопрос и посмотреть, можете ли вы дать мне несколько советов (или, что еще лучше, ответ) о том, как мне это сделать? - person Amir; 03.03.2018
comment
Если используется imap (упорядоченный), будут ли рабочие, которые заканчивают не по порядку, ждать, пока закончатся предыдущие рабочие, прежде чем приступить к следующей части работы? Или они начинают следующую часть работы сразу же после завершения предыдущей и оставляют ее на усмотрение Pool, чтобы убедиться, что возвращенные результаты в порядке? - person BallpointBen; 28.08.2018
comment
@BallpointBen Он перейдет к следующему этапу работы, как только он будет выполнен. Заказ обрабатывается обратно в родительском процессе. - person dano; 28.08.2018
comment
Что произойдет, если вы вообще не заботитесь о возврате результата и, скажем, результаты процесса записываются на диск для последующего использования? - person Tanner; 24.05.2019

Принятый ответ гласит, что для imap_unordered результаты будут получены, как только они будут готовы, из чего можно сделать вывод, что результаты будут возвращены в порядке завершения. Но я просто хочу прояснить, что это вообще неверно. В документации указано, что результаты возвращаются в произвольном порядке. Рассмотрим следующую программу, которая использует размер пула 4, размер iterable 20 и значение chunksize 5. Рабочая функция находится в состоянии ожидания разное количество времени в зависимости от ее переданный аргумент, который также гарантирует, что ни один процесс в пуле не захватит все отправленные задачи. Таким образом, я ожидаю, что каждый процесс в пуле будет иметь 20 / 4 = 5 задачи для обработки:

from multiprocessing import Pool
import time

def worker(x):
    print(f'x = {x}', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)

Отпечатки:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

Вы можете ясно видеть, что эти результаты не выдаются в порядке завершения. Например, мне было возвращено 1621512519.7743165 9, за которым следует 1621512515.268784 0, которое было возвращено рабочей функцией более чем на 4 секунды раньше, чем ранее возвращенный результат. Однако, если я изменю значение chunksize на 1, распечатка станет такой:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

Это находится в порядке завершения. Однако я не решаюсь утверждать, что imap_unordered всегда будет возвращать результаты по мере их появления, если указано значение chunksize, равное 1, хотя это кажется быть случай, основанный на этом эксперименте, поскольку документация не делает такого заявления.

Обсуждение

Когда указан chunksize, равный 5, 20 задач помещаются в одну входную очередь, чтобы 4 процесса в пуле обрабатывали их фрагментами размером 5. Таким образом, простаивающий процесс отключается. поставьте в очередь следующий блок из 5 задач и обработайте каждую из них по очереди, прежде чем снова станет бездействующим. Таким образом, первый процесс будет обрабатывать x аргументов от 0 до 4, второй процесс — x аргументов от 5 до 9 и т. д. Вот почему вы видите начальные x значений, напечатанных как 0, 5, 10 и 15.

Но хотя результат для x аргумента 0 завершается раньше, чем результат для x аргумента 9, может показаться, что результаты записываются вместе как фрагменты, и поэтому результат для x аргумента 0 не будет возвращен до тех пор, пока не будут получены результаты для x аргументов, поставленных в очередь. вверх в том же фрагменте (т. е. 1, 2, 3 и 4) также доступны.

person Booboo    schedule 20.05.2021
comment
Спасибо, это хороший момент. И я согласен с вашим наблюдением, что похоже, что значение результата giben становится доступным для родителя только тогда, когда весь фрагмент, частью которого он является, завершен. - person dano; 02.06.2021