Использование многопроцессорного модуля для запуска параллельных процессов, где один питается (зависит) от другого для алгоритма Витерби.

Недавно я поэкспериментировал с многопроцессорным модулем Python, чтобы ускорить алгоритм прямого и обратного поиска для скрытых марковских моделей, поскольку фильтрация вперед и фильтрация назад могут выполняться независимо. Сокращение времени выполнения вдвое внушало благоговейный трепет.

Теперь я пытаюсь включить многопроцессорность в свой итеративный алгоритм Витерби. В этом алгоритме два процесса, которые я пытаюсь запустить, не являются независимыми. Часть val_max может работать независимо, но arg_max[t] зависит от val_max[t-1]. Поэтому я поиграл с идеей, что можно запускать val_max как отдельный процесс, а затем arg_max также как отдельный процесс, который может быть передан val_max.

Я признаю, что здесь я немного не в своей тарелке и мало что знаю о многопроцессорной обработке, кроме просмотра некоторых основных видео об этом, а также просмотра блогов. Я привожу свою попытку ниже, но она не работает.


import numpy as np
from time import time,sleep
import multiprocessing as mp

class Viterbi:


    def __init__(self,A,B,pi):
        self.M = A.shape[0] # number of hidden states
        self.A = A  # Transition Matrix
        self.B = B   # Observation Matrix
        self.pi = pi   # Initial distribution
        self.T = None   # time horizon
        self.val_max = None
        self.arg_max = None
        self.obs = None
        self.sleep_time = 1e-6
        self.output = mp.Queue()


    def get_path(self,x):
        # returns the most likely state sequence given observed sequence x
        # using the Viterbi algorithm
        self.T = len(x)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        self.val_max[0] = self.pi*self.B[:,x[0]]
        for t in range(1, self.T):
            # Indepedent Process
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0  ) 
            # Dependent Process
            self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)

        # BACKTRACK
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(self.val_max[self.T-1])
        for t in range(self.T-2, -1, -1):
            states[t] = self.arg_max[t+1, states[t+1]]
        return states

    def get_val(self):
        '''Independent Process'''
        for t in range(1,self.T):
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,self.obs[t]]) , axis = 0  ) 
        self.output.put(self.val_max)

    def get_arg(self):
        '''Dependent Process'''
        for t in range(1,self.T):
            while 1:
                # Process info if available
                if self.val_max[t-1].any() != 0:
                    self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
                    break
                # Else sleep and wait for info to arrive
                sleep(self.sleep_time)
        self.output.put(self.arg_max)

    def get_path_parallel(self,x):
        self.obs = x
        self.T = len(obs)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        val_process = mp.Process(target=self.get_val)
        arg_process = mp.Process(target=self.get_arg)  
        # get first initial value for val_max which can feed arg_process
        self.val_max[0] = self.pi*self.B[:,obs[0]]
        arg_process.start()
        val_process.start()
        arg_process.join()
        val_process.join()

Примечание: get_path_parallel еще не имеет возврата.

Может показаться, что процессы val_process и arg_process на самом деле никогда не запускаются. На самом деле не уверен, почему это происходит. Вы можете запустить код на примере Википедии для алгоритма Витерби.

obs = np.array([0,1,2])  # normal then cold and finally dizzy  

pi = np.array([0.6,0.4])

A = np.array([[0.7,0.3],
             [0.4,0.6]])

B = np.array([[0.5,0.4,0.1],
             [0.1,0.3,0.6]]) 

viterbi = Viterbi(A,B,pi)
path = viterbi.get_path(obs)

Я также пытался использовать Ray. Однако я понятия не имел, что я там делал на самом деле. Не могли бы вы помочь порекомендовать мне, что делать, чтобы запустить параллельную версию. Должно быть, я делаю что-то очень неправильное, но я не знаю, что.

Ваша помощь будет высоко оценена.


person Dylan Solms    schedule 23.06.2019    source источник
comment
Добро пожаловать в СО. Попробуйте взглянуть на шаблон производитель-потребитель. который активно используется в многопроцессорной обработке.   -  person Sıddık Açıl    schedule 23.06.2019
comment
@SıddıkAçıl большое спасибо. Я не знал о такой четко определенной схеме. Это действительно очень полезно. Я прочитаю это и попытаюсь исправить свой код.   -  person Dylan Solms    schedule 23.06.2019
comment
@SıddıkAçıl Мне удалось заставить код работать благодаря вашей отличной ссылке. Я также очень ценю ваш ответ, поскольку он дал мне много полезного для обучения. Мой код намного медленнее, чем последовательная версия, скорее всего, из-за того, что процессы, которые я пытаюсь разделить, уже настолько быстры и малы, что накладные расходы на параллелизм того не стоят. Я включил рабочий код в качестве ответа на этот вопрос вместе с некоторыми своими мыслями.   -  person Dylan Solms    schedule 24.06.2019


Ответы (2)


Добро пожаловать в СО. Подумайте о том, чтобы взглянуть на шаблон производитель-потребитель, который широко используется в многопроцессорной обработке.

Имейте в виду, что многопроцессорность в Python повторно создает ваш код для каждого процесса, который вы создаете в Windows. Таким образом, ваши объекты Viterbi и, следовательно, их поля Queue не совпадают.

Наблюдайте за этим поведением через:

import os

def get_arg(self):
    '''Dependent Process'''
    print("Dependent ", self)
    print("Dependent ", self.output)
    print("Dependent ", os.getpid())

def get_val(self):
    '''Independent Process'''
    print("Independent ", self)
    print("Independent ", self.output)
    print("Independent ", os.getpid())

if __name__ == "__main__":
    print("Hello from main process", os.getpid())
    obs = np.array([0,1,2])  # normal then cold and finally dizzy  

    pi = np.array([0.6,0.4])

    A = np.array([[0.7,0.3],
             [0.4,0.6]])

    B = np.array([[0.5,0.4,0.1],
             [0.1,0.3,0.6]]) 

    viterbi = Viterbi(A,B,pi)
    print("Main viterbi object", viterbi)
    print("Main viterbi object queue", viterbi.output)
    path = viterbi.get_path_parallel(obs)

Есть три разных объекта Витерби, поскольку есть три разных процесса. Итак, что вам нужно с точки зрения параллелизма, так это не процессы. Вам следует изучить библиотеку threading, которую предлагает Python.

person Sıddık Açıl    schedule 23.06.2019
comment
Мне удалось заставить код работать благодаря вашей отличной ссылке. Я также очень ценю ваш ответ, поскольку он дал мне много полезного для обучения. Мой код намного медленнее, чем последовательная версия, скорее всего, из-за того, что процессы, которые я пытаюсь разделить, уже настолько быстры и малы, что накладные расходы на параллелизм того не стоят. Я включил рабочий код в качестве ответа на этот вопрос вместе с некоторыми своими мыслями. - person Dylan Solms; 24.06.2019

Мне удалось заставить мой код работать благодаря @SıddıkAçıl. Схема «производитель-потребитель» — вот что делает свое дело. Я также понял, что процессы могут работать успешно, но если не хранить окончательные результаты в своего рода «очереди результатов», они исчезают. Под этим я подразумеваю, что я заполнил значениями свои массивы numpy val_max и arg_max, позволив процессу start(), но когда я их вызвал, они все еще были массивами np.zero. Я проверил, что они заполнили правильные массивы, распечатав их как раз перед завершением процесса (наконец, self.T в итерации). Поэтому вместо того, чтобы печатать их, я просто добавил их в многопроцессорный объект Queue на последней итерации, чтобы захватить весь заполненный массив.

Я предоставляю свой обновленный рабочий код ниже. ПРИМЕЧАНИЕ: это работает, но занимает в два раза больше времени, чем серийная версия. Мои мысли о том, почему это может быть так, следующие:

  1. Я могу заставить его работать как два процесса, но на самом деле не знаю, как это сделать правильно. Опытные программисты могут знать, как это исправить с помощью параметра chunksize.
  2. Два процесса, которые я разделяю, представляют собой операции с матрицей numpy. Эти процессы уже выполняются так быстро, что накладные расходы на параллелизм (многопроцессорность) не стоят теоретического улучшения. Если бы эти два процесса были двумя исходными циклами for (используемыми в Википедии и большинстве реализаций), тогда многопроцессорность могла бы дать выигрыш (возможно, мне следует изучить это). Кроме того, поскольку у нас есть шаблон производитель-потребитель, а не два независимых процесса (шаблон производитель-производитель), мы можем ожидать, что шаблон производитель-потребитель будет работать только до тех пор, пока самый длинный из двух процессов (в этом случае производитель занимает в два раза больше времени). пока потребитель). Мы не можем ожидать, что время выполнения уменьшится вдвое, как в сценарии производитель-производитель (это произошло с моим параллельным алгоритмом фильтрации вперед-назад HMM).
  3. Мой компьютер имеет 4 ядра, и numpy уже выполняет встроенную оптимизацию многопроцессорной обработки процессора в своих операциях. Пытаясь использовать ядра для ускорения кода, я лишаю numpy ядер, которые он мог бы использовать более эффективно. Чтобы выяснить это, я собираюсь рассчитать время операций numpy и посмотреть, медленнее ли они в моей параллельной версии по сравнению с моей последовательной версией.

Я обновлю, если узнаю что-то новое. Если вы, возможно, знаете настоящую причину того, почему мой параллельный код работает намного медленнее, пожалуйста, дайте мне знать. Вот код:


import numpy as np
from time import time
import multiprocessing as mp

class Viterbi:


    def __init__(self,A,B,pi):
        self.M = A.shape[0] # number of hidden states
        self.A = A  # Transition Matrix
        self.B = B   # Observation Matrix
        self.pi = pi   # Initial distribution
        self.T = None   # time horizon
        self.val_max = None
        self.arg_max = None
        self.obs = None
        self.intermediate = mp.Queue()
        self.result = mp.Queue()



    def get_path(self,x):
        '''Sequential/Serial Viterbi Algorithm with backtracking'''
        self.T = len(x)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        self.val_max[0] = self.pi*self.B[:,x[0]]
        for t in range(1, self.T):
            # Indepedent Process
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0  ) 
            # Dependent Process
            self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)

        # BACKTRACK
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(self.val_max[self.T-1])
        for t in range(self.T-2, -1, -1):
            states[t] = self.arg_max[t+1, states[t+1]]
        return states

    def get_val(self,intial_val_max):
        '''Independent Poducer Process'''
        val_max = intial_val_max
        for t in range(1,self.T):
            val_max = np.max( self.A*np.outer(val_max,self.B[:,self.obs[t]]) , axis = 0  )
            #print('Transfer: ',self.val_max[t])
            self.intermediate.put(val_max)
            if t == self.T-1:
                self.result.put(val_max)   # we only need the last val_max value for backtracking




    def get_arg(self):
        '''Dependent Consumer Process.'''
        t = 1
        while t < self.T:
            val_max =self.intermediate.get()
            #print('Receive: ',val_max)
            self.arg_max[t] = np.argmax( val_max*self.A.T, axis = 1)
            if t == self.T-1:
                self.result.put(self.arg_max)
            #print('Processed: ',self.arg_max[t])
            t += 1

    def get_path_parallel(self,x):
        '''Multiprocessing producer-consumer implementation of Viterbi algorithm.'''
        self.obs = x
        self.T = len(obs)
        self.arg_max = np.zeros((self.T, self.M))  # we don't tabulate val_max anymore
        initial_val_max = self.pi*self.B[:,obs[0]]
        producer_process = mp.Process(target=self.get_val,args=(initial_val_max,),daemon=True)
        consumer_process = mp.Process(target=self.get_arg,daemon=True) 
        self.intermediate.put(initial_val_max)  # initial production put into pipeline for consumption
        consumer_process.start()  # we can already consume initial_val_max
        producer_process.start()
        #val_process.join()
        #arg_process.join()
        #self.output.join()
        return self.backtrack(self.result.get(),self.result.get()) # backtrack takes last row of val_max and entire arg_max

    def backtrack(self,val_max_last_row,arg_max):
        '''Backtracking the Dynamic Programming solution (actually a Trellis diagram)
           produced by Multiprocessing Viterbi algorithm.'''
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(val_max_last_row)
        for t in range(self.T-2, -1, -1):
            states[t] = arg_max[t+1, states[t+1]]
        return states



if __name__ == '__main__':

    obs = np.array([0,1,2])  # normal then cold and finally dizzy  

    T = 100000
    obs = np.random.binomial(2,0.3,T)        

    pi = np.array([0.6,0.4])

    A = np.array([[0.7,0.3],
                 [0.4,0.6]])

    B = np.array([[0.5,0.4,0.1],
                 [0.1,0.3,0.6]]) 

    t1 = time()
    viterbi = Viterbi(A,B,pi)
    path = viterbi.get_path(obs)
    t2 = time()
    print('Iterative Viterbi')
    print('Path: ',path)
    print('Run-time: ',round(t2-t1,6)) 
    t1 = time()
    viterbi = Viterbi(A,B,pi)
    path = viterbi.get_path_parallel(obs)
    t2 = time()
    print('\nParallel Viterbi')
    print('Path: ',path)
    print('Run-time: ',round(t2-t1,6))
person Dylan Solms    schedule 24.06.2019
comment
Привет снова, Дилан. Вы проделали огромную работу над своим кодом по сравнению с вашим первым черновиком. По поводу 2-го и 3-го пунктов вы правы. Numpy уже использует встроенные функции процессора для ускорения работы и сильно оптимизирован. Следовательно, введение многопроцессорности в уравнение не принесет вам никакой пользы из-за fork накладные расходы. Я бы посоветовал вам проверить Numba и его декоратор @jit. Это не научит вас многопроцессорности, но, безусловно, повысит вашу скорость. - person Sıddık Açıl; 24.06.2019
comment
Я бы посоветовал вам хорошо разбираться в шаблонах многопоточности/многопроцессорности. Посмотрите MPI и OpenMP. Исследуйте новые возможности C/C++. Посмотрите SIMD и CUDA и то, как они используются в Numpy и CuPy. Я рад, что был вам полезен. - person Sıddık Açıl; 24.06.2019