Высокое использование памяти при запуске многопроцессорной обработки python в Windows

Приведенный ниже код представляет собой надуманный пример, имитирующий реальную проблему, с которой я столкнулся, в которой для ускорения кода используется многопроцессорность. Код запускается на Windows 10 64-bit OS, python 3.7.5 и ipython 7.9.0


функции преобразования (эти функции будут использоваться для преобразования массивов в main())

from itertools import product
from functools import partial

from numba import njit, prange
import multiprocessing as mp
import numpy as np

@njit(parallel= True)
def transform_array_c(data, n):

    ar_len= len(data)

    sec_max1= np.empty(ar_len, dtype = data.dtype)
    sec_max2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_max1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_max= data[n*sec+ n-1]
        s1_max= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] > s2_max:
                s2_max= data[n*sec+i]
            sec_max2[n*sec+i]= s2_max

        sec_max1[n*sec+ n-1]= sec_max2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] > s1_max:
                    s1_max= data[n*sec+n+i]
                sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])

            else:
                break

    return sec_max1  

@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
    msd_temp = np.empty(array1.shape[0])

    K = array2[n-1]

    rs_x= array1[0] - K
    rs_xsq = rs_x *rs_x

    msd_temp[0] = np.nan

    for i in range(1,n):
        rs_x += array1[i] - K
        rs_xsq += np.square(array1[i] - K)
        msd_temp[i] = np.nan

    y_i = array2[n-1] - K
    msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    for i in range(n, array1.shape[0]):
        rs_x = array1[i] - array1[i-n]+ rs_x
        rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
        y_i = array2[i] - K

        msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    return msd_temp 

@njit(cache= True)
def transform_array_a(data, n):
    result = np.empty(data.shape[0], dtype= data.dtype)
    alpharev = 1. - 2 / (n + 1)
    alpharev_exp = alpharev

    e = data[0]
    w = 1.

    if n == 2: result[0] = e
    else:result[0] = np.nan

    for i in range(1, data.shape[0]):
        w += alpharev_exp
        e = e*alpharev + data[i]

        if i > n -3:result[i] = e / w
        else:result[i] = np.nan

        if alpharev_exp > 3e-307:alpharev_exp*= alpharev
        else:alpharev_exp=0.

    return result

Многопроцессорная часть

def func(tup, data):    #<-------------the function to be run among all 
    a_temp= a[tup[2][0]]

    idx1 = a_temp > a[tup[2][1]]
    idx2= a_temp < b[(tup[2][1], tup[1][1])]

    c_final = c[tup[0][1]][idx1 | idx2]
    data_final= data[idx1 | idx2]

    return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]

def setup(a_dict, b_dict, c_dict):    #initialize the shared dictionaries
    global a,b,c
    a,b,c = a_dict, b_dict, c_dict

def main(a_arr, b_arr, c_arr, common_len):
    np.random.seed(0)
    data_array= np.random.normal(loc= 24004, scale=500, size= common_len)

    a_size = a_arr[-1] + 1
    b_size = len(b_arr)
    c_size = len(c_arr)

    loop_combo = product(enumerate(c_arr),
                         enumerate(b_arr),
                         (n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
                         )
    result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32) 

    ###################################################
    #This part simulates the heavy-computation in the actual problem

    a= {}
    b= {}
    c= {}

    for i in range(1, a_arr[-1]+1):

        a[i]= transform_array_a(data_array, i)
        if i in a_arr:
            for j in b_arr:
                b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j


    for i in c_arr:
        c[i]= transform_array_c(data_array, i)

    ###################################################    
    with mp.Pool(processes= mp.cpu_count() - 1,
                 initializer= setup,
                 initargs= [a,b,c]
                 ) as pool:
        mp_res= pool.imap_unordered(partial(func, data= data_array),
                                    loop_combo
                                    )

        for item in mp_res:
            result[item[0]] =item[1]


    return result


if __name__ == '__main__':
    mp.freeze_support()

    a_arr= np.arange(2,44,2)
    b_arr= np.arange(0.4,0.8, 0.20)
    c_arr= np.arange(2,42,10)
    common_len= 440000

    final_res= main(a_arr, b_arr, c_arr, common_len)

Из соображений производительности все процессы используют несколько общих словарей «только для чтения», чтобы уменьшить избыточные вычисления (в реальной задаче общее время вычислений сокращается на 40% после использования общих словарей среди всех процессов). Однако использование оперативной памяти становится абсурдно выше после использования общих словарей в моей реальной проблеме; использование памяти на моем компьютере с Windows 6C/12T увеличивается с (8,2 ГБ в пиковом режиме, 5,0 ГБ в режиме ожидания) до (23,9 ГБ в пиковом режиме, 5,0 ГБ в режиме ожидания), что слишком дорого для увеличения скорости на 40%.

Является ли высокое использование оперативной памяти неизбежным, когда использование нескольких общих данных между процессами является обязательным? Что можно сделать с моим кодом, чтобы сделать его как можно быстрее, используя как можно меньше памяти?

заранее спасибо


Примечание. Я пытался использовать imap_unordered() вместо map, потому что я слышал, что это должно уменьшить использование памяти, когда входная итерация велика, но я, честно говоря, не вижу улучшения в использовании оперативной памяти. Может быть, я сделал что-то не так здесь?


РЕДАКТИРОВАТЬ: Из-за отзывов в ответах я уже изменил тяжелую вычислительную часть кода, чтобы она выглядела менее фиктивной и напоминала вычисление в реальной задаче.


person mathguy    schedule 04.11.2019    source источник
comment
Вы можете уменьшить потребление памяти, используя numpy.memmap массивы.   -  person martineau    schedule 05.11.2019
comment
@martineau, в какой части моего сценария, по вашему мнению, лучше всего использовать массивы numpy.memmap?   -  person mathguy    schedule 05.11.2019
comment
Замените ими «несколько общих словарей только для чтения».   -  person martineau    schedule 05.11.2019
comment
@martineau Я попробую, надеюсь, использование памяти можно уменьшить без потери слишком большого прироста производительности за счет компромисса между ускорением памяти.   -  person mathguy    schedule 05.11.2019
comment
Отображение памяти может быть довольно быстрым, потому что на самых низких уровнях используется поддержка ОС.   -  person martineau    schedule 05.11.2019
comment
@martineau Это потрясающе. Из того, что я видел в примере, кажется, использует какие-то файлы для обработки сохраненных массивов. Надеюсь, они смогут обрабатывать 3-4 общих массива по 50 МБ.   -  person mathguy    schedule 05.11.2019
comment
На самом деле это не так уж и много, особенно если у вас много оперативной памяти. Даже если у вас их не так много, отображаемые в память файлы обрабатываются собственными механизмами обработки ошибок страниц ОС, которые обычно очень оптимизированы. Вот некоторая дополнительная информация о них.   -  person martineau    schedule 05.11.2019
comment
Только что заметил, что в Python 3.8 был добавлен новый класс multiprocessing.shared_memory — так что это еще одна возможность. В документации есть даже пример, демонстрирующий практическое использование класса SharedMemory с массивами NumPy.   -  person martineau    schedule 05.11.2019
comment
@martineau приятно знать, что в python 3.8 есть хорошо задокументированное решение такой проблемы. Не могу дождаться обновления до 3.8 в следующем месяце, потому что на данный момент я не могу обновить его.   -  person mathguy    schedule 05.11.2019
comment
@martineau При всем уважении, O/P не использует активно ни одного шага, который оправдал бы затраты на совместное использование любой опубликованной структуры данных, описанной выше. Решение проблем высокопроизводительных вычислений и вычислений заключается не в том, чтобы найти простой способ выбрать самый дешевый результат (набрав удобный синтаксический конструктор, который кажется простым, допустимым, но при этом очень неэффективным). во время выполнения), а скорее в разработке кода, специально предназначенного для достижения прямо противоположного — достижения максимальной производительности во время выполнения. Это этап решения проблемы.   -  person user3666197    schedule 05.11.2019
comment
@ user3666197: Точно так же, при всем уважении, я просто продолжаю отвечать на главный вопрос, который является действительным, и, вероятно, именно поэтому большинство людей даже посмотрят на этот вопрос.   -  person martineau    schedule 05.11.2019
comment
@mathguy Я совсем забыл вернуться к этому, ты все еще открыт/ищешь ответы?   -  person AMC    schedule 12.11.2019
comment
@AlexanderCécile да, я все еще ищу ответы с примерами кода.   -  person mathguy    schedule 12.11.2019
comment
@mathguy Отлично, я вернусь и посмотрю завтра! Как вы думаете, может быть, стоит создать чат для этого вопроса, так как, вероятно, будет много дискуссий?   -  person AMC    schedule 12.11.2019
comment
@AlexanderCécile да, это определенно будет полезно.   -  person mathguy    schedule 12.11.2019
comment
@mathguy Хорошо, ты сейчас делаешь один или мне сделать это?   -  person AMC    schedule 12.11.2019
comment
@AlexanderCécile Я могу сделать один   -  person mathguy    schedule 12.11.2019
comment
Давайте продолжим обсуждение в чате.   -  person AMC    schedule 12.11.2019
comment
@mathguy Неважно, не беспокойтесь! SO, наконец, дал мне возможность сделать это автоматически.   -  person AMC    schedule 12.11.2019


Ответы (1)


Высокий уровень использования памяти при работе с общими словарями в python multiprocessing, работающем в Windows

Справедливо немного демистифицировать проблему, прежде чем мы перейдем к деталям: в исходном коде отсутствуют общие словари, тем меньше их манипулируют (да, каждому из a,b,c была назначена ссылка на dict_a, dict_b, dict_c, но ни один из них не является общим, а просто реплицируется как multiprocessing в операционных системах класса Windows. Нет записи в dict-s (только неразрушающее чтение из любой из их реплик)

Точно так же np.memmap() можно поместить некоторую часть первоначально предложенных данных на дисковое пространство (за счет этого + с некоторой (замаскированной задержкой) задержкой случайного чтения ~ 10 [ms] вместо ~ 0.5 [ns], если интеллектуальное выравнивание векторизовано шаблоны памяти были разработаны в горячую точку производительности), но здесь не следует ожидать резкой смены парадигмы, поскольку внешний итератор почти избегает повторного использования кеша с интеллектуальным выравниванием.

В : Что можно сделать с моим кодом, чтобы сделать его максимально быстрым при минимальном использовании памяти?

Первый грех заключался в использовании 8B-int64 для хранения одного простого Bbit (здесь пока нет кубитов ~ Все приветствия командам исследований и разработок Burnaby Quantum)

for i in c_arr:                                    # ~~ np.arange( 2, 42, 10 )
     np.random.seed( i )                           # ~ yields a deterministic state
     c[i] = np.random.poisson( size = common_len ) # ~ 440.000 int64-s with {0|1}

Для этого потребовалось 6 (процессов) x 440000 x 8B ~ 0.021 GB контрабандным путем пронести все копии словаря c, в то время как каждая такое значение детерминировано известно и может быть сгенерировано ALAP внутри соответствующего целевого процесса, просто зная значение i (на самом деле нет необходимости предварительно генерировать и многократно копировать ~ 0.021 GB данных)

До сих пор операционная система класса Windows отсутствовала os.fork() и, следовательно, использовала python полная копия (да, ОЗУ..., да, ВРЕМЯ) столько реплицированных сеансов интерпретатора Python (плюс импорт основного модуля), сколько было запрошено, в multiprocessing для разделения на основе процессов (делая это, чтобы избежать GIL-lock заказал, pure-[SERIAL], выполнение кода )


Лучший следующий шаг:
рефакторинг кода
для повышения эффективности и производительности.

Лучший следующий шаг - рефакторинг кода, чтобы свести к минимуму поверхностное (и дорогое) использование 6-процессов, но управляемых извне центральным итератором (loop_combo диктатор с ~ 18522 элементами для повторения вызов удаленно управляемого func( tup, data ), чтобы получить простой DMA-tuple-( (x,y,z), value ) для сохранения одного value в центральном процессе result-float32-array ).

Попытайтесь увеличить вычислительную плотность, поэтому попытайтесь реорганизовать код по принципу «разделяй и властвуй» (то есть, чтобы каждый из mp.pool-процессов вычислял в одном гладком блоке некоторое выделенное подпространство замечательного размера для параметра- пространство покрыто (здесь итеративно из ouside) и может легко уменьшить возвращаемые блоки результатов.Производительность только улучшится при этом (лучше всего без какой-либо формы дорогостоящего совместного использования).

Этот рефакторинг позволит избежать затрат на параметры pickle/unpickle (дополнительные накладные расходы - как одноразовые (при передаче уникальных значений набора параметров), так и повторяющиеся (примерно 18522 раза выполняемые повторяющиеся выделения памяти, наращивание и pickle/unpickle-затраты на np.arange( 440000 ) из-за плохого дизайна / проектирования позывных)

Все эти шаги повысят эффективность обработки и сократят ненужное выделение оперативной памяти.

person user3666197    schedule 05.11.2019
comment
Первый грех был связан с тем, что я пытался сгенерировать кучу массивов, столь же затратных в вычислительном отношении, как фактическая функция array_transformation, которую я использую в реальной задаче, где я использую разные взвешенные скользящие средние в a, b, c. Значение i в этой надуманной задаче соответствует moving_window_size, потому что небольшая разница в i(moving_window_size) приведет к совершенно другому массиву. Возможно, мне следует переписать эту часть в своем посте, чтобы она действительно отражала то, что делает функция array_transformation. - person mathguy; 05.11.2019
comment
@mathguy Пожалуйста, обновите свой код, если можете, я хотел бы взглянуть на это позже :) - person AMC; 05.11.2019
comment
@AlexanderCécile очень ценю вашу помощь, ребята - person mathguy; 05.11.2019
comment
Довольно хороший ответ, хотя немного трудно переварить. Краткое резюме в конце было бы отличным дополнением. - person martineau; 05.11.2019
comment
Я все еще не понимаю (т. е. что каждый из mp.pool-процессов вычисляет в одном гладком блоке некоторое выделенное подпространство замечательного размера в покрытом пространстве параметров......) Из того, что я понимаю, кажется, что я необходимо, чтобы каждый рабочий процесс в mp.pool вычислял ровно один блок smooth подпространства параметров (всего 6 блоков smooth). Тогда как я могу заставить mp.pool сделать это? Каждый раз, когда я вижу рекомендации по многопроцессорности, мне просто говорят, что нужно бросить функцию и итерацию и положить этому конец. Я изо всех сил пытаюсь найти четкие примеры создания ручного сглаженного блока для добавления в mp.pool. - person mathguy; 05.11.2019
comment
@martineau Спасибо за комплимент и совет. Маленький мир снова здесь. Руководство по FORTRAN IV было моей второй Библией (первой была книга по HP-67 - очень познавательная книга, не только о программировании, она была полна историй о дизайн-мышлении - от симулятора Moon Lander (конечно, не такая сложная, как AGC -ferrite-ROM магия полетела к Морю Спокойствия внутри Орла). ОМГ, время так быстро. В любом случае, спасибо, что напомнил мне о тех прекрасных днях, Мартин. Позвольте мне пожелать вам всего наилучшего, Ман: о) - person user3666197; 05.11.2019