распараллелить цикл над iter

У меня проблемы с производительностью моего кода. шаг # IIII занимает часы времени. Раньше я материализовал itertools.prodct, но благодаря пользователю я больше не делаю pro_data = product(array_b,array_a). Это помогло мне с проблемами памяти, но по-прежнему требует много времени. Я хотел бы распараллелить его с помощью многопоточности или многопроцессорности, что бы вы ни предложили, я благодарен.

Объяснение. У меня есть два массива, которые содержат значения x и y частиц. Для каждой частицы (определяемой двумя координатами) я хочу вычислить функцию с другой. Для комбинаций я использую метод itertools.product и перебираю каждую частицу. Всего я запускаю более 50000 частиц, поэтому мне нужно рассчитать N * N/2 комбинаций.

заранее спасибо

import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement

def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
  return (ar1*ar2**22+np.sin(ar3)+ar4)

def newdist(a):
  return func(a[0][0],a[0][1],a[1][0],a[1][1])    

x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram 

x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]

hist_data=np.zeros([len(x_edges)-1])

array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)

array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
  (result,bins) = np.histogram(newdist(i),bins=x_edges)
  hist_data+=result

hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()

----- РЕДАКТИРОВАТЬ ----- Сейчас я использовал этот код:

def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
  def worker(array_ab, out_q):
      """ push result in queue """
      outdict = {}
      outdict = vec_chunk(array_ab, bins)
      out_q.put(outdict)
  out_q = mp.Queue()
  a = np.swapaxes(array_a, 0 ,1)
  b = np.swapaxes(array_b, 0 ,1)
  array_size_a=len(array_a)-(len(array_a)%d)
  array_size_b=len(array_b)-(len(array_b)%d)
  a_chunk = array_size_a / d
  b_chunk = array_size_b / d
  procs = []
  #prepare arrays for mp
  array_ab = np.empty((4, a_chunk, b_chunk))
  for j in xrange(d):
    for k in xrange(d):
      array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
      array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
      p = mp.Process(target=worker, args=(array_ab, out_q))
      procs.append(p)
      p.start()
  resultarray = np.empty(len(bins)-1)
  for i in range(d):
      resultarray+=out_q.get() 
  # Wait for all worker processes to finish
  for pro in procs:
      pro.join()
  print resultarray
  return resultarray

Проблема здесь в том, что я не могу контролировать количество процессов. Как я могу использовать mp.Pool() вместо этого? чем


person madzone    schedule 07.03.2013    source источник
comment
Не могли бы вы обновить свой пост, чтобы я мог запустить его как есть и чтобы синтаксис/отступ были правильными? Добавьте пару строк, чтобы сгенерировать примеры array1 и подобных переменных. В противном случае люди гораздо реже будут тратить время на ответ на ваш вопрос...   -  person YXD    schedule 07.03.2013
comment
Вместо этого я предлагаю вам изучить Cython для ускорения на несколько порядков. Параллельная обработка с помощью Python и Numpy нетривиальна.   -  person Fred Foo    schedule 07.03.2013
comment
1) если ваш newdist1 симметричен, вы можете вдвое сократить время, взяв каждую пару только один раз; 2) вы можете использовать multiprocessing (не multithreading), чтобы распределить работу по нескольким процессам. Возможно, самым простым способом было бы иметь pool процессов, где каждый получает значение i и ведет подсчет. В сети есть много примеров, так что просто попробуйте. 3) если этого недостаточно, то есть cython.   -  person ev-br    schedule 07.03.2013
comment
@all, спасибо @Mr E, код теперь работает @larsmans Я не знаком с Cython, позвольте мне внимательно посмотреть. Это не CPython, верно? @ Женя, да, симметрично, я тоже могу использовать combination_with_replacements   -  person madzone    schedule 07.03.2013
comment
Я не думаю, что вам действительно нужно делать что-то большее, чем векторизация цикла for и устранение некоторых других неэффективностей. Если ваша память не может вместить массив 50 000 x 50 000, то ответ JF Sebastian может быть подходящим: разбить его на 4 из 25 000 x 25 000, или 16 из 12 500 x 125 500, или... Наконец, ваш образец данные реалистичны? Вы передаете newdist 4 значения, из которых только 2 являются уникальными, там тоже может быть некоторая производительность для очистки.   -  person Jaime    schedule 07.03.2013


Ответы (2)


Во-первых, давайте посмотрим на простую векторизацию вашей проблемы. У меня такое ощущение, что вы хотите, чтобы ваши array_a и array_b были точно такими же, то есть координаты частиц, но я держу их здесь отдельно.

Я превратил ваш код в функцию, чтобы упростить синхронизацию:

def IIII(array_a, array_b, bins) :
    hist_data=np.zeros([len(bins)-1])
    for i in product(array_a,array_b):
        (result,bins) = np.histogram(newdist(i), bins=bins)
        hist_data+=result
    hist_data = np.array(map(float, hist_data))
    return hist_data

Кстати, вы можете сгенерировать свои образцы данных менее запутанным способом следующим образом:

n = 100
array_a = np.random.uniform(0, 10, size=(n, 2))
array_b = np.random.uniform(0, 10, size=(n, 2))

Итак, сначала нам нужно векторизовать ваш func. Я сделал так, что он может принимать любую array формы (4, ...). Для экономии памяти он делает расчет на месте и возвращает первую плоскость, т.е. array[0].

def func_vectorized(a) :
    a[1] **= 22
    np.sin(a[2], out=a[2])
    a[0] *= a[1]
    a[0] += a[2]
    a[0] += a[3]
    return a[0]

Имея эту функцию, мы можем написать векторизованную версию IIII:

def IIII_vec(array_a, array_b, bins) :
    array_ab = np.empty((4, len(array_a), len(array_b)))
    a = np.swapaxes(array_a, 0 ,1)
    b = np.swapaxes(array_b, 0 ,1)
    array_ab[[0, 1]] = a[:, :, None]
    array_ab[[2, 3]] = b[:, None, :]
    newdist = func_vectorized(array_ab)
    hist, _ = np.histogram(newdist, bins=bins)
    return hist

С n = 100 точками они оба возвращают одно и то же:

In [2]: h1 = IIII(array_a, array_b, x_edges)

In [3]: h2 = IIII_bis(array_a, array_b, x_edges)

In [4]: np.testing.assert_almost_equal(h1, h2)

А вот временные различия уже очень актуальны:

In [5]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 654 ms per loop

In [6]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.08 ms per loop

Ускорение в 300 раз!. Если вы попробуете еще раз с более длинными выборочными данными, n = 1000, вы увидите, что они оба одинаково плохо масштабируются, как n**2, поэтому 300x остается там:

In [10]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 68.2 s per loop

In [11]: %timeit IIII_bis(array_a, array_b, x_edges)
1 loops, best of 3: 229 ms per loop

Итак, вы все еще смотрите на хорошие 10 мин. обработки, что на самом деле не так много по сравнению с более чем 2 днями, которые потребуются вашему текущему решению.

Конечно, чтобы все было так хорошо, вам нужно поместить в память массив (4, 50000, 50000) чисел с плавающей запятой, что моя система не может обработать. Но вы по-прежнему можете делать вещи относительно быстро, обрабатывая их по частям. Следующая версия IIII_vec делит каждый массив на d фрагментов. Как написано, длина массива должна быть кратна d. Было бы несложно преодолеть это ограничение, но оно запутало бы истинную цель:

def IIII_vec_bis(array_a, array_b, bins, d=1) :
    a = np.swapaxes(array_a, 0 ,1)
    b = np.swapaxes(array_b, 0 ,1)
    a_chunk = len(array_a) // d
    b_chunk = len(array_b) // d
    array_ab = np.empty((4, a_chunk, b_chunk))
    hist_data = np.zeros((len(bins) - 1,))
    for j in xrange(d) :
        for k in xrange(d) :
            array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
            array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
            newdist = func_vectorized(array_ab)
            hist, _ = np.histogram(newdist, bins=bins)
            hist_data += hist
    return hist_data

Во-первых, давайте проверим, что это действительно работает:

In [4]: h1 = IIII_vec(array_a, array_b, x_edges)

In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10)

In [6]: np.testing.assert_almost_equal(h1, h2)

А теперь немного таймингов. С n = 100:

In [7]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.02 ms per loop

In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
100 loops, best of 3: 12 ms per loop

Но по мере того, как вы начинаете иметь все больший и больший массив в памяти, выполнение его по частям начинает окупаться. С n = 1000:

In [12]: %timeit IIII_vec(array_a, array_b, x_edges)
1 loops, best of 3: 223 ms per loop

In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 208 ms per loop

С n = 10000 я больше не могу вызывать IIII_vec без ошибки array is too big, но краткая версия все еще работает:

In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 21.8 s per loop

И просто чтобы показать, что это можно сделать, я запустил его один раз с n = 50000:

In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50)
1 loops, best of 1: 543 s per loop

Итак, хорошие 9 минут обработки чисел, что не так уж и плохо, учитывая, что было вычислено 2,5 миллиарда взаимодействий.

person Jaime    schedule 07.03.2013

Используйте векторизованные операции numpy. Замените цикл for поверх product() одним вызовом newdist(), создав аргументы с помощью meshgrid().

Чтобы парализовать задачу, вычислите newdist() на срезах array_a, array_b, которые соответствуют подблокам meshgrid(). Вот пример использования срезов и многопроцессорной обработки.

Вот еще один пример для демонстрации шагов: цикл python -> векторизованная версия numpy -> parallel:

#!/usr/bin/env python
from __future__ import division
import math
import multiprocessing as mp
import numpy as np

try:
    from itertools import izip as zip
except ImportError:
    zip = zip # Python 3

def pi_loop(x, y, npoints):
    """Compute pi using Monte-Carlo method."""
    #  note: the method converges to pi very slowly.
    return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1) / npoints

def pi_vectorized(x, y, npoints):
    return 4 * ((x**2 + y**2) < 1).sum() / npoints # or just .mean()

def mp_init(x_shared, y_shared):
    global mp_x, mp_y
    mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy

def mp_pi(args):
    # perform computations on slices of mp_x, mp_y
    start, end = args
    x = mp_x[start:end] # no copy
    y = mp_y[start:end]
    return ((x**2 + y**2) < 1).sum()

def pi_parallel(x, y, npoints):
    # compute pi using multiple processes
    pool = mp.Pool(initializer=mp_init, initargs=[x, y])
    step = 100000
    slices = ((start, start + step) for start in range(0, npoints, step))
    return 4 * sum(pool.imap_unordered(mp_pi, slices)) / npoints

def main():
    npoints = 1000000

    # create shared arrays
    x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)]

    # initialize arrays
    x, y = map(np.frombuffer, [x_sh, y_sh])
    x[:] = np.random.uniform(size=npoints)
    y[:] = np.random.uniform(size=npoints)

    for f, a, b in [(pi_loop, x, y), 
                    (pi_vectorized, x, y), 
                    (pi_parallel, x_sh, y_sh)]:
        pi = f(a, b, npoints)
        precision = int(math.floor(math.log10(npoints)) / 2 - 1 + 0.5)
        print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi)))

if __name__=="__main__":
    main()

Показатели времени для npoints = 10_000_000:

pi_loop pi_vectorized pi_parallel 
   32.6         0.159       0.069 # seconds

Он показывает, что основное преимущество в производительности заключается в преобразовании цикла python в его векторизованный аналог numpy.

person jfs    schedule 07.03.2013
comment
@ Хайме и Дж. Ф., спасибо, я тоже думал рассчитать это на сетке, но мне нужно вычислить newdist для каждой комбинации массивов друг с другом, а не только для подвыборки. - person madzone; 07.03.2013
comment
@madzone: вы вычисляете разные подблоки в нескольких процессах. Всего это будут все возможные комбинации. - person jfs; 07.03.2013
comment
танк, к сожалению, расчет meshgrid перегружает мою память, поэтому я не материализовал product в своем примере какие-либо идеи, как я все еще могу использовать itertools и multiprocessing? - person madzone; 07.03.2013
comment
@madzone Мой ответ ниже показывает вам, как _chunkitize- ваша проблема. Вы могли бы ускорить его, если бы каждый из этих фрагментов обрабатывался одновременно с использованием указателей Дж. Ф. Себастьяна и его подробного ответа в другом связанном вопросе. - person Jaime; 07.03.2013
comment
@madzone: попробуйте аргумент sparse=True meshgrid. Или создайте meshgrid внутри отдельных процессов только по подблоку за раз. - person jfs; 07.03.2013
comment
@Jaime, это здорово, большое спасибо, J.F.Sebastian, мне потребовалось некоторое время, но я близок к тому, чтобы понять, что вы сделали, также большое вам спасибо. - person madzone; 08.03.2013
comment
@madzone: я добавил пример кода, демонстрирующий преобразование python loop -> vectorized numpy version -> parallel. - person jfs; 08.03.2013