Есть ли у multiprocessing.pool.imap вариант (например, звездная карта), который позволяет использовать несколько аргументов?

Я делаю некоторые вычисления для больших коллекций байтов. Процесс работает с кусками байтов. Я пытаюсь использовать параллельную обработку с использованием многопроцессорности для повышения производительности. Сначала я пытался использовать pool.map, но он допускал только один аргумент, затем я узнал о pool.starmap. Но pool.starmap дает результаты только тогда, когда все процессы завершены. Мне нужны результаты по мере их поступления (вроде). Я пытаюсь использовать pool.imap, который предоставляет результаты по завершении процессов, но не допускает нескольких аргументов (моя функция требует 2 аргумента). Кроме того, важна последовательность результатов.

Пример кода ниже:

pool = mp.Pool(processes=4)
y = []
for x in pool.starmap(f, zip(da, repeat(db))):
    y.append(x)

Приведенный выше код работает, но дает результаты только после завершения всех процессов. Я не вижу никакого прогресса. Вот почему я попытался использовать pool.imap, работает хорошо, но только с одним аргументом:

pool = mp.Pool(processes=4)
y = []
for x in pool.imap(f, da)):
    y.append(x)

При нескольких аргументах возникает следующее исключение:

TypeError: f() missing 1 required positional argument: 'd'

Ищете простой способ выполнить все 3 требования:

  1. параллельная обработка с использованием нескольких параметров/аргументов
  2. удается видеть прогресс во время работы процессов
  3. упорядоченные результаты.

Спасибо!


person Abdul Qadir    schedule 11.09.2015    source источник


Ответы (3)


На первые два вопроса я могу ответить довольно быстро. Я думаю, вы сможете ответить на третий вопрос после понимания первых двух.

<сильный>1. Параллельная обработка с несколькими аргументами

Я не уверен насчет полного эквивалента «звездной карты», но вот альтернатива. Что я делал в прошлом, так это объединял свои аргументы в один объект данных, такой как список. Например, если вы хотите передать три аргумента вашему map_function, вы можете добавить эти аргументы в список, а затем использовать этот список с функцией .map() или .imap().

def map_function(combo):
    a = combo[0]
    b = combo[1]
    c = combo[2]
    return a + b + c

if '__name__' == '__main__':
    combo = []
    combo[0] = arg_1
    combo[1] = arg_2
    combo[2] = arg_3

    pool = Pool(processes=4)
    pool.map(map_function, combo)

<сильный>2. Отслеживание прогресса

Хороший способ сделать это — использовать общее значение multiprocessing. На самом деле я задал этот (почти) тот же точный вопрос около месяца назад. Это позволяет вам манипулировать одной и той же переменной из разных процессов, созданных вашей функцией map. Ради обучения я дам вам возможность самостоятельно прочитать и разобраться в решении с общим состоянием. Если у вас все еще возникают проблемы после нескольких попыток, я буду более чем счастлив помочь вам, но я считаю, что научиться понимать что-то гораздо ценнее, чем я дам вам ответ.

Надеюсь это поможет!!

person Austin A    schedule 11.09.2015
comment
Пока я прохожу по вашей ссылке, я просто хотел упомянуть, что с помощью pool.starmap я получаю элементы 1 и 3, но не 2, а с помощью pool.imap я получаю все 3, кроме части с несколькими аргументами. Мой второй аргумент - это трехмерный список (который остается постоянным для всех процессов). Вы бы все же рекомендовали объединить их в комбинацию. Спасибо! - person Abdul Qadir; 11.09.2015
comment
решение по ссылке работает, и теперь я могу достичь всех 3. Большое спасибо за это. я хотел увидеть обновление с помощью индикатора выполнения. я пытался заставить его работать, но запуск индикатора выполнения в основном и обновление в add_print, похоже, не работает. я даже пытался пометить индикатор выполнения как глобальный или передать его как параметр. но не повезло. любые мысли, пожалуйста. - person Abdul Qadir; 11.09.2015

Вы можете имитировать starmap с помощью imap с помощью функции functools.partial():

import functools
import multiprocessing as mp

def my_function(constant, my_list, optional_param=None):
    print(locals())

with mp.Pool() as pool:
    list(pool.imap(functools.partial(my_function, 
                                2,
                                optional_param=3),
              [1,2,3,4,5]))

Выходы:

$ python3 foo.py
{'optional_param': 3, 'my_list': 1, 'constant': 2}
{'optional_param': 3, 'my_list': 3, 'constant': 2}
{'optional_param': 3, 'my_list': 2, 'constant': 2}
{'optional_param': 3, 'my_list': 4, 'constant': 2}
{'optional_param': 3, 'my_list': 5, 'constant': 2}
person confused00    schedule 28.11.2018

Я думаю, что это решение точно соответствует вашим трем требованиям: https://stackoverflow.com/a/28382913/2379433

Короче говоря, p = Pool(); p.imap позволит вам видеть прогресс и поддерживать порядок. Если вам нужны функции map с несколькими аргументами, вы можете использовать ответвление multiprocessing, которое обеспечивает лучшую сериализацию и несколько аргументов. См. ссылку для примера.

person Mike McKerns    schedule 11.09.2015