Недавно я написал программу с классом для своего исследования и попытался распараллелить ее. Когда я использовал multiprocessing.Process Python 2.7 с JoinableQueue и управляемыми данными, моя программа в конечном итоге зависала с несуществующими процессами.
import multiprocessing as mp
import traceback
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _calc_parallel(self, index):
self._calc_bond(index)
def run(self):
for ts, force in itertools.izip(self.coortrj, self.forcevec):
try:
consumers = [mp.Process(target=self._calc_parallel,
args=(force,)) for i in range(nprocs)]
for w in consumers:
w.start()
# Enqueue jobs
for i in range(self.totalsites):
self.tasks.put(i)
# Add a poison pill for each consumer
for i in range(nprocs):
self.tasks.put(None)
self.tasks.close()
self.tasks.join()
# for w in consumers:
# w.join()
except:
traceback.print_exc()
_calc_parallel вызывает некоторые другие методы класса.
Я даже пытался использовать multiprocessing.Pool для этой цели, используя параметр copy_reg, который можно найти в другом месте на http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods..
import multiprocessing as mp
import traceback
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _use_force(force):
# Calculate data
def _calc_parallel(self, index, force):
self._calc_bond(index)
self._use_force(force)
def run(self):
try:
pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
args = itertools.izip(range(self.totalsites), itertools.repeat(force))
pool.map_async(self._calc_parallel, args)
pool.close()
pool.join()
except:
traceback.print_exc()
Однако pool.map_async, похоже, не вызывает self._calc_parallel. Я знаю, что в обоих случаях (процесс и пул) я что-то упускаю из виду, но я не совсем понимаю, что именно. Обычно я обрабатываю более 40 000 элементов.
Спасибо за помощь.
Обновить
Прочитав несколько других сообщений, я также попробовал pathos.multiprocessing.
import pathos.multiprocessing as mp
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _use_force(force):
# Calculate data
def _calc_parallel(self, index, force):
self._calc_bond(index)
self._use_force(force)
def run(self):
try:
pool = mp.ProcessingPool(nprocs)
args = itertools.izip(range(self.totalsites), itertools.repeat(force))
pool.amap(lambda x: self._calc_parallel(*x), args)
except:
traceback.print_exc()
И, как и в случае с моими предыдущими попытками, это тоже быстро проходит без вызова метода.
Обновление 2
Я решил переработать код, чтобы разделить мой гигантский класс на более мелкие и более управляемые компоненты. Однако, если я использую pathos.multiprocessing, я сталкиваюсь с ситуацией, отличной от предыдущей (см. ссылка). В моем новом коде теперь есть объект, который можно использовать для расчета, а затем с помощью его методов он должен возвращать значение.
import itertools
import pandas as pd
import pathos.multiprocessing as mp
class ForceData(object):
def __init__(self, *args, **kwargs):
# Setup data
self.value = pd.DataFrame()
def calculateBondData(self, index):
# Calculation
return self.value
def calculateNonBondedData(self, index):
# Calculation
return self.value
def calculateAll(self, index):
# Because self.value is a pandas.DataFrame, changed internally
self.calculateBondData(index)
self.calculateNonBondedData(index)
return self.value
class ForceMatrix(object):
def __init__(self, *args, **kwargs):
# Initialize data
self._matrix = pd.DataFrame()
def map(self, data):
for value in data.get():
for i, j in itertools.product(value.index, repeat=2):
self._matrix.loc[[i], [j]] += value.values
def calculate(self, *args, **kwargs):
# Setup initial information.
fd = ForceData()
matrix = ForceMatrix()
pool = mp.ProcessingPool()
data = pool.amap(fd.calculateAll, range(x))
matrix.map(data, force)
return matrix
Думал, что отдельная функция func(dataobj, force)
, но и это вроде не помогает. При текущей скорости, по моим оценкам, полный расчет на одном процессоре займет более 1000 часов, что слишком много для чего-то, что должно быть быстрее.
Обновление 3 (30 апреля 2015 г.)
Благодаря полезной информации @MikeMcKerns я, возможно, остановился на возможном решении. На iMac (четырехъядерном) или 16-ядерном узле кластера я обнаружил, что для крупнозернистой (CG) системы без связей лучшим решением кажется двойной itertools.imap
(1000 сайтов CG). составляет примерно 5,2 с на кадр траектории. Когда я перешел на систему, которая включает некоторые детали связи (3000 сайтов компьютерной графики, представляющих воду), я обнаружил, что на iMac (использующем 1 ядро) itertools.imap
, за которым следует pathos.ThreadingPool.uimap
(4 потока), работает примерно 85 с/кадр; если я попытаюсь использовать пул процессов (4 ядра x 2)/пул потоков (4 потока), как это было предложено в комментариях @MikeMcKerns, время вычислений увеличилось в 2,5 раза. На 16-ядерном кластере (32 pp/16 tp) эта CG-система тоже идет медленно (ок. 160 с/кадр). Система компьютерной графики с 42 778 сайтами и многочисленными соединениями на iMac (1 ядро/4 потока) может работать примерно за 58 мин/кадр. Мне еще предстоит протестировать эту большую систему на 16-ядерном узле кластера, но я не уверен, ускорит ли ее дальнейшее использование пула процессов/пула потоков.
Примеры:
# For a CG system with no bond details
for i in range(nframes):
data1 = itertools.imap(func1, range(nsites))
data2 = itertools.imap(func2, data1)
for values in data2:
func3(values)
# For a system with bond details
import pathos.multiprocessing as mp
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
data1 = itertools.imap(func1, range(nsites))
data2 = tpool.uimap(func2, data1)
for values in data2:
func3(values)
# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
data1 = ppool.uimap(func1, range(nsites))
data2 = tpool.uimap(func2, data1)
for values in data2:
func3(values)
Я подозреваю, что чем больше система, тем больше пользы я могу получить от многопроцессорности. Я знаю, что большая система CG (42 778 сайтов) занимает примерно 0,08 с/сайт по сравнению с 0,02 с/сайт (3000 сайтов CG) или 0,05 с/сайт (1000 сайтов без связей).
В своем стремлении сократить время вычислений я обнаружил области, в которых я мог бы сократить некоторые вычисления (например, global
переменных и изменений алгоритма), но если бы я мог сократить это еще больше с помощью полномасштабного мультипроцессора, это было бы здорово. .
pathos
автор. Кажется, вы вызываетеpool.amap
, который должен вернуть объект результатаresult
, который вы нигде не сохраняете. Затем вам нужно вызватьresult.get()
, чтобы получить результат. Есть такжеimap
, который возвращает итератор, и старый добрыйmap
, который напрямую возвращает список вычисленных значений.amap
является асинхронным, поэтому он не должен блокироваться при вызове карты — он блокируется при вызовеget
. Если вы хотите заблокировать карту, используйтеmap
. - person Mike McKerns   schedule 24.04.2015def func(obj, index)
). есть идеи? - person Tim   schedule 24.04.2015multiprocessing.ctypes
- с его помощью вы можете создавать объекты/массивы общей памяти. Это немного сложно, и вам нужно беспокоиться о блокировке потоков, но это может сработать, если вам нужно избежать копирования, котороеmultiprocessing
делает в случае по умолчанию. - person Mike McKerns   schedule 24.04.2015fd.CalculateAll
занимает один звонок? Ага… Я вижу проблему, вы не хотите использоватьPool().amap
иdata.get()
… это блокируетget
. Лучшим выбором являетсяPool().imap
, который создает итератор... и затем вы можете повторять цикл for по мере поступления результатов. Или, что лучше, переписать и создать пул потоков и пул обработки, где выget
получаете результаты только в конец вложенных функций карты. - person Mike McKerns   schedule 24.04.2015dill.detect.errors
,dill.pickles
илиdill.copy
. - person Mike McKerns   schedule 25.04.2015amap
блокируетget
для всех результатов, аuimap
блокируетnext
только для следующего результата. Что касается потоков или обработки, я обнаружил, что лучше иметь один пул потоков и один пул обработки. Я работаю над добавлением гибкости, которая могла бы это изменить, но пока это так. По сути, вы должны думать о том, насколько дорого обходится оценка функции… и сколько процессов нужно запускать одновременно в вашей системе — вы можете использовать гораздо больше потоков для легкой работы. - person Mike McKerns   schedule 29.04.2015pathos
в первую очередь, заключалась в том, что, поняв, что вы должны иметь возможность немного поэкспериментировать, чтобы найти правильный уровень и средства параллелизма… Я обнаружил, что экспериментировать должно быть очень легко. - person Mike McKerns   schedule 19.05.2015