Многопроцессорность Python. Пул игнорирует метод класса

Недавно я написал программу с классом для своего исследования и попытался распараллелить ее. Когда я использовал multiprocessing.Process Python 2.7 с JoinableQueue и управляемыми данными, моя программа в конечном итоге зависала с несуществующими процессами.

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _calc_parallel(self, index):
        self._calc_bond(index)

    def run(self):
        for ts, force in itertools.izip(self.coortrj, self.forcevec):
        try:
            consumers = [mp.Process(target=self._calc_parallel,
                         args=(force,)) for i in range(nprocs)]
            for w in consumers:
                w.start()

            # Enqueue jobs
            for i in range(self.totalsites):
                self.tasks.put(i)

            # Add a poison pill for each consumer
            for i in range(nprocs):
                self.tasks.put(None)

            self.tasks.close()
            self.tasks.join()

    #       for w in consumers:
    #           w.join()
        except:
            traceback.print_exc()

_calc_parallel вызывает некоторые другие методы класса.

Я даже пытался использовать multiprocessing.Pool для этой цели, используя параметр copy_reg, который можно найти в другом месте на http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods..

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.map_async(self._calc_parallel, args)
            pool.close()
            pool.join()
        except:
            traceback.print_exc()

Однако pool.map_async, похоже, не вызывает self._calc_parallel. Я знаю, что в обоих случаях (процесс и пул) я что-то упускаю из виду, но я не совсем понимаю, что именно. Обычно я обрабатываю более 40 000 элементов.

Спасибо за помощь.

Обновить

Прочитав несколько других сообщений, я также попробовал pathos.multiprocessing.

import pathos.multiprocessing as mp
class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.ProcessingPool(nprocs)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.amap(lambda x: self._calc_parallel(*x), args)
        except:
            traceback.print_exc()

И, как и в случае с моими предыдущими попытками, это тоже быстро проходит без вызова метода.

Обновление 2

Я решил переработать код, чтобы разделить мой гигантский класс на более мелкие и более управляемые компоненты. Однако, если я использую pathos.multiprocessing, я сталкиваюсь с ситуацией, отличной от предыдущей (см. ссылка). В моем новом коде теперь есть объект, который можно использовать для расчета, а затем с помощью его методов он должен возвращать значение.

import itertools
import pandas as pd
import pathos.multiprocessing as mp

class ForceData(object):
    def __init__(self, *args, **kwargs):
        # Setup data
        self.value = pd.DataFrame()
    def calculateBondData(self, index):
        # Calculation
        return self.value
    def calculateNonBondedData(self, index):
        # Calculation
        return self.value
    def calculateAll(self, index):
        # Because self.value is a pandas.DataFrame, changed internally
        self.calculateBondData(index)
        self.calculateNonBondedData(index)
        return self.value

class ForceMatrix(object):
    def __init__(self, *args, **kwargs):
        # Initialize data
        self._matrix = pd.DataFrame()
    def map(self, data):
        for value in data.get():
            for i, j in itertools.product(value.index, repeat=2):
                self._matrix.loc[[i], [j]] += value.values

def calculate(self, *args, **kwargs):
    # Setup initial information.
    fd = ForceData()
    matrix = ForceMatrix()
    pool = mp.ProcessingPool()
    data = pool.amap(fd.calculateAll, range(x))
    matrix.map(data, force)
    return matrix

Думал, что отдельная функция func(dataobj, force), но и это вроде не помогает. При текущей скорости, по моим оценкам, полный расчет на одном процессоре займет более 1000 часов, что слишком много для чего-то, что должно быть быстрее.

Обновление 3 (30 апреля 2015 г.)

Благодаря полезной информации @MikeMcKerns я, возможно, остановился на возможном решении. На iMac (четырехъядерном) или 16-ядерном узле кластера я обнаружил, что для крупнозернистой (CG) системы без связей лучшим решением кажется двойной itertools.imap (1000 сайтов CG). составляет примерно 5,2 с на кадр траектории. Когда я перешел на систему, которая включает некоторые детали связи (3000 сайтов компьютерной графики, представляющих воду), я обнаружил, что на iMac (использующем 1 ядро) itertools.imap, за которым следует pathos.ThreadingPool.uimap (4 потока), работает примерно 85 с/кадр; если я попытаюсь использовать пул процессов (4 ядра x 2)/пул потоков (4 потока), как это было предложено в комментариях @MikeMcKerns, время вычислений увеличилось в 2,5 раза. На 16-ядерном кластере (32 pp/16 tp) эта CG-система тоже идет медленно (ок. 160 с/кадр). Система компьютерной графики с 42 778 сайтами и многочисленными соединениями на iMac (1 ядро/4 потока) может работать примерно за 58 мин/кадр. Мне еще предстоит протестировать эту большую систему на 16-ядерном узле кластера, но я не уверен, ускорит ли ее дальнейшее использование пула процессов/пула потоков.

Примеры:

# For a CG system with no bond details
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = itertools.imap(func2, data1)
    for values in data2:
        func3(values)

# For a system with bond details
import pathos.multiprocessing as mp

tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = ppool.uimap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

Я подозреваю, что чем больше система, тем больше пользы я могу получить от многопроцессорности. Я знаю, что большая система CG (42 778 сайтов) занимает примерно 0,08 с/сайт по сравнению с 0,02 с/сайт (3000 сайтов CG) или 0,05 с/сайт (1000 сайтов без связей).

В своем стремлении сократить время вычислений я обнаружил области, в которых я мог бы сократить некоторые вычисления (например, global переменных и изменений алгоритма), но если бы я мог сократить это еще больше с помощью полномасштабного мультипроцессора, это было бы здорово. .


person Tim    schedule 23.04.2015    source источник
comment
Было бы проще помочь вам, если бы вы могли сделать свой примерный код минимальным (например, удалить большинство тел методов, которые не имеют ничего общего с неожиданным поведением).   -  person tobyodavies    schedule 23.04.2015
comment
@tobyodavies, я могу это сделать. Сначала я его урезал, но подумал, что кто-то может захотеть узнать об остальном. Я определенно могу удалить некоторые из них. Спасибо.   -  person Tim    schedule 23.04.2015
comment
Похоже, вы ожидаете, что дочерние процессы будут иметь побочный эффект в родительском процессе. Это правильно? То есть дочерний процесс каким-то образом модифицирует себя, а не возвращает свои вычисленные данные. Вы не можете сделать это с многопроцессорной обработкой, и вам нужно будет переосмыслить свое решение. Если это так, то я покажу вам минимальный рабочий пример того, как это сделать.   -  person Dunes    schedule 23.04.2015
comment
@Dunes, так вы говорите, что вместо использования управляемых переменных (mp.Manager.dict и т. д.) я должен возвращать значения из каждого метода, а затем выполнять окончательную обработку? В моих методах одним из моих переданных аргументов является pandas.DataFrame, но у меня также есть место, где возвращается тот же DataFrame, если я хочу его собрать, но один из моих методов обрабатывал данные и сохранял их в управляемой структуре данных. Однако я предполагаю, что это неправильное мышление. Я ценю любое понимание, которое вы можете предложить.   -  person Tim    schedule 23.04.2015
comment
Привет Тим, я pathos автор. Кажется, вы вызываете pool.amap, который должен вернуть объект результата result, который вы нигде не сохраняете. Затем вам нужно вызвать result.get(), чтобы получить результат. Есть также imap, который возвращает итератор, и старый добрый map, который напрямую возвращает список вычисленных значений. amap является асинхронным, поэтому он не должен блокироваться при вызове карты — он блокируется при вызове get. Если вы хотите заблокировать карту, используйте map.   -  person Mike McKerns    schedule 24.04.2015
comment
@MikeMcKerns, спасибо. После долгих поисков я понял, что иду по этому пути неправильно. Я изначально не осознавал, что пафос и многопроцессорность Python создают копии объекта. Следовательно, они не делились данными, а это означало, что мои расчеты все равно были ошибочными. Я решил переписать код, разбив свой класс-бегемот на более мелкие фрагменты. Это позволило мне создать нужные копии с помощью amap или map_async и выполнить вычисления. Я ожидал слишком многого от одного метода. На одном процессоре это работает, но не на многопроцессорной.   -  person Tim    schedule 24.04.2015
comment
@MikeMcKerns, я выделил это, но потом столкнулся с проблемой пафосного удушья, найденной в ссылка. Я пытался использовать внешнюю функцию для вызова объекта и переменной безрезультатно (например, def func(obj, index)). есть идеи?   -  person Tim    schedule 24.04.2015
comment
@Tim: ответ на вашу ссылку также не решил вашу проблему, если она не работает из-за травления? Это должно получиться, если вы сделаете тонкую обертку вокруг импорта. Кроме того, если вам нужно обмениваться данными, вы можете заглянуть в multiprocessing.ctypes - с его помощью вы можете создавать объекты/массивы общей памяти. Это немного сложно, и вам нужно беспокоиться о блокировке потоков, но это может сработать, если вам нужно избежать копирования, которое multiprocessing делает в случае по умолчанию.   -  person Mike McKerns    schedule 24.04.2015
comment
Сколько времени fd.CalculateAll занимает один звонок? Ага… Я вижу проблему, вы не хотите использовать Pool().amap и data.get()… это блокирует get. Лучшим выбором является Pool().imap, который создает итератор... и затем вы можете повторять цикл for по мере поступления результатов. Или, что лучше, переписать и создать пул потоков и пул обработки, где вы get получаете результаты только в конец вложенных функций карты.   -  person Mike McKerns    schedule 24.04.2015
comment
Например, см. здесь: stackoverflow.com/a/28770041/2379433   -  person Mike McKerns    schedule 24.04.2015
comment
... и stackoverflow.com/a/28382913/2379433   -  person Mike McKerns    schedule 24.04.2015
comment
@MikeMcKerns, спасибо за ответы. Не могли бы вы опубликовать пример того, как я могу добиться использования пула потоков/пула обработки с помощью функции вложенной карты? К вашему сведению, один расчет занимает в среднем 300 мс, но с большим набором данных (40 000–60 000), повторяющимся 1000 раз, это суммируется (обработка траектории моделирования N атомов). Я искренне благодарен за то, что разделил класс на более мелкие компоненты, но я понимаю, что многопроцессорность может быть головной болью, хотя в конце концов она того стоит.   -  person Tim    schedule 25.04.2015
comment
@MikeMcKerns, я пытался использовать функцию imap, но все еще получаю исключения, сводящиеся к NotImplemented из-за моего класса, даже с функцией-оболочкой или частичной. Я собираюсь изучить свой класс и, возможно, уменьшить его в качестве теста. Я использую pandas и MDAnalysis в своем классе, что заставляет меня задаться вопросом, можно ли их обработать, но изначально они настраиваются в конструкторе. Я так определенно хочу, чтобы это сработало, но я все еще сбит с толку. Еще раз спасибо за ваши комментарии.   -  person Tim    schedule 25.04.2015
comment
Простой способ проверить, можно ли что-то сериализовать, — это попробовать что-то вроде dill.detect.errors, dill.pickles или dill.copy.   -  person Mike McKerns    schedule 25.04.2015
comment
Вот несколько примеров вложенных карт: stackoverflow.com/a/29818165/2379433 и stackoverflow.com/questions/28203774/   -  person Mike McKerns    schedule 25.04.2015
comment
@MikeMcKerns, что бы вы порекомендовали: использовать пул потоков для тяжелых вычислений и пул обработки для сбора, как в вашем последнем комментарии, или какой-либо другой метод? В настоящее время я настроил пул обработки (uimap) для тяжелой работы и пул потоков (amap) для окончательного сбора данных. Мне интересно, могу ли я использовать uimap для обоих. В очередной раз благодарим за помощь.   -  person Tim    schedule 29.04.2015
comment
Выбор итеративной карты в сравнении с асинхронной картой следует делать в зависимости от того, нужна ли вам блокировка. amap блокирует get для всех результатов, а uimap блокирует next только для следующего результата. Что касается потоков или обработки, я обнаружил, что лучше иметь один пул потоков и один пул обработки. Я работаю над добавлением гибкости, которая могла бы это изменить, но пока это так. По сути, вы должны думать о том, насколько дорого обходится оценка функции… и сколько процессов нужно запускать одновременно в вашей системе — вы можете использовать гораздо больше потоков для легкой работы.   -  person Mike McKerns    schedule 29.04.2015
comment
@MikeMcKerns, спасибо за информацию. Мне интересно, что небольшая система (1000 сайтов) работает быстрее с itertools.imap. Система с 3000 сайтов быстрее всего работает на моем iMac (4 ядра) с itertools.imap и ThreadingPool.uimap (4 потока). Если я использую комбо, предложенное выше, они оба замедляются, особенно с 16-ядерной машиной. Однако я не уверен, что мой большой (>40 000 сайтов) будет справедливым. Есть мысли по этому поводу?   -  person Tim    schedule 30.04.2015
comment
Причина в том, что itertools.imap дешевле… он использует параллелизм C, в то время как многопроцессорная версия порождает процессы. Как только вы запускаете порождающие процессы, вы берете на себя много накладных расходов. Для потоков вы можете в конечном итоге столкнуться с накладными расходами в памяти.   -  person Mike McKerns    schedule 30.04.2015
comment
Затем ваш комментарий вызывает вопрос: когда лучше всего использовать распараллеливание? Я понимаю, что эти небольшие системы (вероятно, менее 10 000 сайтов) могут не нуждаться в распараллеливании, но мне интересно, выиграют ли от этого мои более крупные системы, предполагая, что вычисления на сайт выполняются менее чем за 1 секунду.   -  person Tim    schedule 01.05.2015
comment
Это всегда компромисс/оптимизация между накладными расходами и экономией от параллелизма. Причина, по которой я создал pathos в первую очередь, заключалась в том, что, поняв, что вы должны иметь возможность немного поэкспериментировать, чтобы найти правильный уровень и средства параллелизма… Я обнаружил, что экспериментировать должно быть очень легко.   -  person Mike McKerns    schedule 19.05.2015


Ответы (1)


Ваши возможности довольно ограничены, если вы используете python 2.7.

Вот краткий пример вызова метода объекта с аргументами в пуле.

Первая проблема заключается в том, что мариновать можно только функцию, определенную на верхнем уровне модуля. В системах на основе Unix есть способ обойти это ограничение, но на него не следует полагаться. Таким образом, вы должны определить функцию, которая принимает нужный объект и аргументы, необходимые для вызова соответствующего метода.

Например:

def do_square(args):
    squarer, x = args # unpack args
    return squarer.square(x)

Класс Squarer может выглядеть так:

class Squarer(object):
    def square(self, x):
        return x * x

Теперь, чтобы применить квадратную функцию параллельно.

if __name__ == "__main__":
    # all pool work must be done inside the if statement otherwise a recursive 
    # cycle of Pool spawning will be created.

    pool = Pool()
    sq = Squarer()
    # create args as a sequence of tuples
    args = [(sq, i) for i in range(10)]
    print pool.map(do_square, args)

    pool.close()
    pool.join()

Обратите внимание, что квадрат не имеет состояния. Он получает данные, обрабатывает их и возвращает результат. Это связано с тем, что состояние ребенка отличается от состояния родителя. Изменения в одном не будут отражаться в другом, если только не используются Queues, Pipes или другие классы общего состояния, доступные благодаря многопроцессорности. Как правило, лучше возвращать вычисленные данные из дочернего процесса, а затем что-то делать с этими данными в родительском процессе, а не пытаться сохранить данные в некоторой общей переменной, к которой у дочернего процесса есть доступ.

Пример квадрата с состоянием, не работающего с многопроцессорностью:

class StatefulSquarer(object):

    def __init__(self):
        self.results = []

    def square(self, x):
        self.results.append(x * x)

if __name__ == "__main__":

    print("without pool")
    sq = StatefulSquarer()
    map(do_square, [(sq, i) for i in range(10)])
    print(sq.results)

    print("with pool")
    pool = Pool()
    sq = StatefulSquarer()
    pool.map(do_square, [(sq, i) for i in range(10)])
    print(sq.results)

    pool.close()
    pool.join()

Если вы хотите сделать эту работу, лучшим решением будет что-то вроде:

for result in pool.map(do_square, [(sq, i) for i in range(10)]):
    sq.results.append(result)

Что делать, если ваш класс очень большой, но неизменный. Каждый раз, когда вы запускаете новую задачу на карте, этот огромный объект должен быть скопирован в дочерний процесс. Однако вы можете сэкономить время, скопировав его в дочерний процесс только один раз.

from multiprocessing import Pool

def child_init(sq_):
    global sq
    sq = sq_

def do_square(x):
    return sq.square(x)

class Squarer(object):
    def square(self, x):
        return x * x

if __name__ == "__main__":
    sq = Squarer()
    pool = Pool(initializer=child_init, initargs=(sq,))

    print(pool.map(do_square, range(10)))

    pool.close()
    pool.join()
person Dunes    schedule 23.04.2015
comment
Как это будет работать при вызове пула внутри класса, как в моем примере кода? Я вызываю Pool из одного метода, чтобы вызвать другой метод. Я читал, что пафос может предложить решение, но, как было отмечено в моем обновлении, это, похоже, не сработало. Будет ли использование multiprocessing.Process с JoinableQueue лучше? Если да, то как мне избежать проблемы с несуществующим процессом? - person Tim; 23.04.2015
comment
Если вы следите за моим обсуждением с Майком и читаете мое последнее обновление, вы заметите, что я, возможно, нашел решение. Тем не менее, я хочу сообщить вам, что ваше решение было полезным. Эта игра дает мне некоторые идеи для другого проекта, над которым я работаю, и хотя ваш ответ, возможно, не был решением моей текущей проблемы, он может быть для другого проекта. Еще раз спасибо за ваш ответ; Я ценю его. - person Tim; 30.04.2015