Промежуточный результат многопроцессорного процесса

У меня есть функция, которая загружает данные и циклически повторяет время, например.

def calculate_profit(account):
    account_data = load(account) #very expensive operation
    for day in account_data.days:
        print(account_data.get(day).profit)

Поскольку загрузка данных стоит дорого, имеет смысл использовать joblib/multiprocessing, чтобы сделать что-то вроде этого:

arr = [account1, account2, account3, ...]
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr))

Однако у меня есть еще одна дорогостоящая функция, которую я хотел бы применить к промежуточным результатам функции calculate_profit. Например, предположим, что суммировать всю прибыль и обрабатывать ее/размещать на веб-сайте/и т. д. – затратная операция. Также мне нужна прибыль предыдущего дня, чтобы рассчитать изменение прибыли в этой функции.

def expensive_sum(prev_day_profits, *account_profits):
    total_profit_today = sum(account_profits)
    profit_difference = total_profit_today - prev_day_profits

    #some other expensive operation
    #more expensive operations

Так что я хотел бы

  1. Запускайте многопроцессорные процессы параллельно (чтобы уменьшить нагрузку на загрузку всех дорогостоящих данных учетной записи)
  2. Как только каждый многопроцессорный процесс достигает предопределенной точки (например, завершает одну итерацию цикла), возвращайте эти промежуточные значения другой функции (expensive_sum) для обработки — предположим, что каждый отдельный многопроцессорный процесс не может продолжаться до expensive_sum возвращается
  3. ОДНАКО, я хочу поддерживать многопроцессорные процессы, чтобы мне не приходилось их повторно инициализировать (уменьшая эти накладные расходы)

Есть какой-либо способ сделать это?


person Michael    schedule 19.07.2017    source источник
comment
мешает ли это отдельным процессам продолжаться до тех пор, пока очередь не вернется?   -  person Michael    schedule 19.07.2017
comment
О каком объеме данных мы говорим как для load(), так и для expensive_sum()? Потому что, если вы собираетесь передавать большие объемы данных, вы можете потерять любое преимущество, которое вы получите от многопроцессорной обработки, из-за балета травления / распаковки, который Python делает при обмене данными между процессами.   -  person zwer    schedule 19.07.2017
comment
отдельные процессы просто put значения.... очередь не возвращает   -  person vks    schedule 19.07.2017
comment
@zwer это просто игрушечный пример   -  person Michael    schedule 19.07.2017


Ответы (1)


from multiprocessing import Manager
queue = manager.Queue()

Как только каждый многопроцессорный процесс достигает предопределенной точки, выполните

queue.put(item)

Между тем другая дорогостоящая функция делает

queue.get(item)  ==>  blocking call for get

Дорогостоящая функция ожидает get и продолжает работу, когда получает значение, обрабатывает его и снова ждет get

person vks    schedule 19.07.2017
comment
спасибо - как бы вы реализовали блокирующий вызов для get? - person Michael; 19.07.2017
comment
@Michael блокирует по умолчанию ... если вы хотите не блокировать, вы должны использовать get_nowait() - person vks; 19.07.2017
comment
Хорошо, спасибо! извините, еще один вопрос - вы вызываете queue.get из другого процесса? Как бы вы его инициализировали? Например, если у меня есть 10 процессов, выполняющих queue.put(item[[0]), как я могу убедиться, что они не продолжают очередь.put(item[1]) до процесса(item[0], .. ., пункт[1]) заканчивается? - person Michael; 19.07.2017
comment
@Michael, для этого вы можете использовать queue.qsize() ... после завершения обработки размер очереди будет равен 0 - person vks; 19.07.2017
comment
@Michael, вы можете создать глобальный queue и использовать его внутри процессов..... или в __init__ класса, в котором находятся ваши функции. - person vks; 19.07.2017