По какой оси функция mpi4py Scatterv разбивает массив numpy?

У меня есть следующий MWE, использующий comm.Scatterv и comm.Gatherv для распределения массива 4D по заданному количеству ядер (size)

import numpy as np
from mpi4py import MPI
import matplotlib.pyplot as plt

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    test = np.random.rand(411,48,52,40) #Create array of random numbers
    outputData = np.zeros(np.shape(test))
    split = np.array_split(test,size,axis = 0) #Split input array by the number of available cores

    split_sizes = []

    for i in range(0,len(split),1):
        split_sizes = np.append(split_sizes, len(split[i]))

    displacements = np.insert(np.cumsum(split_sizes),0,0)[0:-1]

    plt.imshow(test[0,0,:,:])
    plt.show()

else:
#Create variables on other cores
    split_sizes = None
    displacements = None
    split = None
    test = None
    outputData = None

#Broadcast variables to other cores
test = comm.bcast(test, root = 0)
split = comm.bcast(split, root=0) 
split_sizes = comm.bcast(split_sizes, root = 0)
displacements = comm.bcast(displacements, root = 0)

output_chunk = np.zeros(np.shape(split[rank])) #Create array to receive subset of data on each core, where rank specifies the core
print("Rank %d with output_chunk shape %s" %(rank,output_chunk.shape))

comm.Scatterv([test,split_sizes, displacements,MPI.DOUBLE],output_chunk,root=0) #Scatter data from test across cores and receive in output_chunk

output = output_chunk

plt.imshow(output_chunk[0,0,:,:])
plt.show()

print("Output shape %s for rank %d" %(output.shape,rank))

comm.Barrier()

comm.Gatherv(output,[outputData,split_sizes,displacements,MPI.DOUBLE], root=0) #Gather output data together

if rank == 0:
    print("Final data shape %s" %(outputData.shape,))
    plt.imshow(outputData[0,0,:,:])
    plt.show()

Это создает четырехмерный массив случайных чисел и в принципе должен разделить его на size ядер перед рекомбинацией. Я ожидал, что Scatterv будет делиться по оси 0 (длина 411) в соответствии с начальными целыми числами и смещениями в векторах split_sizes и displacements. Однако я получаю ошибку при рекомбинации с Gatherv (mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated) и график output_chunk на каждом ядре показывает, что большая часть входных данных была потеряна, поэтому кажется, что разделение не произошло по первой оси.

Мои вопросы: почему разделение не происходит по первой оси, как узнать, по какой оси происходит разделение, и можно ли изменить/указать, по какой оси это происходит?


person 218    schedule 16.03.2016    source источник
comment
comm.Scatterv вероятно ничего не знает о numpy массиве, форме, размерах или шагах. В лучшем случае он может рассматривать test как блок памяти. На самом деле это может быть только указатель на объект массива, а не на его буфер данных. Работает ли этот код с массивом 1d? Или test.flatten()?   -  person hpaulj    schedule 16.03.2016


Ответы (1)


comm.Scatterv и comm.Gatherv ничего не знают о размерах массива numpy. Они просто видят sendbuf как блок памяти. Поэтому необходимо учитывать это при указании sendcounts и displacements (см. http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html для получения подробной информации). Предполагается также, что данные размещаются в памяти в стиле C (мажорные строки).

Пример для двумерной матрицы приведен ниже. Ключевыми частями этого кода являются правильная установка split_sizes_input/split_sizes_output и displacements_input/displacements_output. Код учитывает размер второго измерения, чтобы указать правильные деления в блоке памяти:

split_sizes_input = split_sizes*512

Для более высоких измерений эта строка будет изменена на:

split_sizes_input = split_sizes*indirect_dimension_sizes

куда

indirect_dimension_sizes = npts2*npts3*npts4*....*nptsN

и аналогично для split_sizes_output.

Код создает двумерный массив с числами от 1 до 512, увеличивающимися по одному измерению. По графикам легко увидеть, правильно ли были разделены и рекомбинированы данные.

import numpy as np
from mpi4py import MPI
import matplotlib.pyplot as plt

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    test = np.arange(0,512,dtype='float64')
    test = np.tile(test,[256,1]) #Create 2D input array. Numbers 1 to 512 increment across dimension 2.
    outputData = np.zeros([256,512]) #Create output array of same size
    split = np.array_split(test,size,axis = 0) #Split input array by the number of available cores

    split_sizes = []

    for i in range(0,len(split),1):
        split_sizes = np.append(split_sizes, len(split[i]))

    split_sizes_input = split_sizes*512
    displacements_input = np.insert(np.cumsum(split_sizes_input),0,0)[0:-1]

    split_sizes_output = split_sizes*512
    displacements_output = np.insert(np.cumsum(split_sizes_output),0,0)[0:-1]


    print("Input data split into vectors of sizes %s" %split_sizes_input)
    print("Input data split with displacements of %s" %displacements_input)

    plt.imshow(test)
    plt.colorbar()
    plt.title('Input data')
    plt.show()

else:
#Create variables on other cores
    split_sizes_input = None
    displacements_input = None
    split_sizes_output = None
    displacements_output = None
    split = None
    test = None
    outputData = None

split = comm.bcast(split, root=0) #Broadcast split array to other cores
split_sizes = comm.bcast(split_sizes_input, root = 0)
displacements = comm.bcast(displacements_input, root = 0)
split_sizes_output = comm.bcast(split_sizes_output, root = 0)
displacements_output = comm.bcast(displacements_output, root = 0)

output_chunk = np.zeros(np.shape(split[rank])) #Create array to receive subset of data on each core, where rank specifies the core
print("Rank %d with output_chunk shape %s" %(rank,output_chunk.shape))
comm.Scatterv([test,split_sizes_input, displacements_input,MPI.DOUBLE],output_chunk,root=0)

output = np.zeros([len(output_chunk),512]) #Create output array on each core

for i in range(0,np.shape(output_chunk)[0],1):
    output[i,0:512] = output_chunk[i]

plt.imshow(output)
plt.title("Output shape %s for rank %d" %(output.shape,rank))
plt.colorbar()
plt.show()

print("Output shape %s for rank %d" %(output.shape,rank))

comm.Barrier()

comm.Gatherv(output,[outputData,split_sizes_output,displacements_output,MPI.DOUBLE], root=0) #Gather output data together



if rank == 0:
    outputData = outputData[0:len(test),:]
    print("Final data shape %s" %(outputData.shape,))
    plt.imshow(outputData)
    plt.colorbar()
    plt.show()
    print(outputData)
person 218    schedule 18.03.2016