Многопроцессорность Python: как ограничить количество ожидающих процессов?

При запуске большого количества задач (с большими параметрами) с помощью Pool.apply_async процессы выделяются и переходят в состояние ожидания, при этом ограничения на количество ожидающих процессов нет. Это может привести к съедению всей памяти, как в примере ниже:

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

Я ищу способ ограничить очередь ожидания таким образом, чтобы количество ожидающих процессов было ограничено, а Pool.apply_async блокировался, пока очередь ожидания заполнена.


person André Panisson    schedule 15.06.2012    source источник


Ответы (4)


multiprocessing.Pool имеет член _taskqueue типа multiprocessing.Queue, который принимает необязательный параметр maxsize; к сожалению, он строит его без набора параметров maxsize.

Я бы рекомендовал создать подкласс multiprocessing.Pool с копией-вставкой multiprocessing.Pool.__init__, которая передает maxsize конструктору _taskqueue.

Обезьянье исправление объекта (будь то пул или очередь) также сработает, но вам придется обезьяньим исправлением pool._taskqueue._maxsize и pool._taskqueue._sem так что это будет довольно хрупким:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
person ecatmur    schedule 15.06.2012
comment
Я использую Python 2.7.3, а _taskqueue имеет тип Queue.Queue. Это означает, что это простая очередь, а не многопроцессорная очередь. Создание подкласса multiprocessing.Pool и переопределение init работает нормально, но обезьянье исправление объекта не работает должным образом. Тем не менее, это именно тот хак, который я искал, спасибо. - person André Panisson; 16.06.2012

Подождите, если pool._taskqueue превышает желаемый размер:

import multiprocessing
import time

import numpy as np


def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()
person Roger Dahl    schedule 25.02.2017
comment
Просто хочу добавить, что я нашел это самым простым решением моих проблем с памятью при многопроцессорной обработке. Я использовал max_apply_size = 10, и это отлично подходит для моей проблемы, заключающейся в медленном преобразовании файлов. Использование семафора, как предлагает @ecatmur, кажется более надежным решением, но может быть излишним для простых сценариев. - person Nate; 02.10.2017
comment
TaylorMonacelli То, что ваши правки были отклонены, иллюстрирует проблемы с модами на SO. Ваше редактирование исправило ошибку. @greg-449 — это пошаговая модификация, одобряющая только 15% правок и приводящая бессмысленную причину отклонения. - person Roger Dahl; 17.01.2020

Вот альтернатива исправления обезьяны верхнему ответу:

import queue
from multiprocessing.pool import ThreadPool as Pool


class PatchedQueue():
  """
  Wrap stdlib queue and return a Queue(maxsize=...)
  when queue.SimpleQueue is accessed
  """

  def __init__(self, simple_queue_max_size=5000):
    self.simple_max = simple_queue_max_size  

  def __getattr__(self, attr):
    if attr == "SimpleQueue":
      return lambda: queue.Queue(maxsize=self.simple_max)
    return getattr(queue, attr)


class BoundedPool(Pool):
  # Override queue in this scope to use the patcher above
  queue = PatchedQueue()

pool = BoundedPool()
pool.apply_async(print, ("something",))

Это работает, как и ожидалось, для Python 3.8, где многопроцессорный пул использует queue.SimpleQueue для настройки очереди задач. Похоже, реализация multiprocessing.Pool могла измениться с версии 2.7.

person nijave    schedule 27.03.2021
comment
Я не тестировал ThreadPool, но если я изменяю его на from multiprocessing.pool import Pool, он не работает (лимит не меняется, и кажется, что SimpleQueue не меняется на Queue). Есть идеи, как это решить? - person Filipe; 07.06.2021

Вы можете добавить явную очередь с параметром maxsize и использовать queue.put() вместо pool.apply_async() в этом случае. Тогда рабочие процессы могут:

for a, b in iter(queue.get, sentinel):
    # process it

Если вы хотите ограничить количество созданных входных аргументов/результатов, которые находятся в памяти, приблизительно равным количеству активных рабочих процессов, вы можете использовать pool.imap*() методы:

#!/usr/bin/env python
import multiprocessing
import numpy as np

def f(a_b):
    return np.linalg.solve(*a_b)

def main():
    args = ((np.random.rand(1000,1000), np.random.rand(1000))
            for _ in range(1000))
    p = multiprocessing.Pool()
    for result in p.imap_unordered(f, args, chunksize=1):
        pass
    p.close()
    p.join()

if __name__ == '__main__':
    main()
person jfs    schedule 15.06.2012
comment
Использование imap не имеет значения. Очередь ввода по-прежнему не ограничена, и использование этого решения приведет к съедению всей памяти. - person Radim; 11.09.2015
comment
@Radim: код imap в ответе работает, даже если вы дадите ему бесконечный генератор. - person jfs; 11.09.2015
comment
К сожалению, не в Python 2 (не смотрел код в py3). Для некоторых обходных путей см. этот ответ SO. - person Radim; 11.09.2015