Недавно я поэкспериментировал с многопроцессорным модулем Python, чтобы ускорить алгоритм прямого и обратного поиска для скрытых марковских моделей, поскольку фильтрация вперед и фильтрация назад могут выполняться независимо. Сокращение времени выполнения вдвое внушало благоговейный трепет.
Теперь я пытаюсь включить многопроцессорность в свой итеративный алгоритм Витерби. В этом алгоритме два процесса, которые я пытаюсь запустить, не являются независимыми. Часть val_max может работать независимо, но arg_max[t] зависит от val_max[t-1]. Поэтому я поиграл с идеей, что можно запускать val_max как отдельный процесс, а затем arg_max также как отдельный процесс, который может быть передан val_max.
Я признаю, что здесь я немного не в своей тарелке и мало что знаю о многопроцессорной обработке, кроме просмотра некоторых основных видео об этом, а также просмотра блогов. Я привожу свою попытку ниже, но она не работает.
import numpy as np
from time import time,sleep
import multiprocessing as mp
class Viterbi:
def __init__(self,A,B,pi):
self.M = A.shape[0] # number of hidden states
self.A = A # Transition Matrix
self.B = B # Observation Matrix
self.pi = pi # Initial distribution
self.T = None # time horizon
self.val_max = None
self.arg_max = None
self.obs = None
self.sleep_time = 1e-6
self.output = mp.Queue()
def get_path(self,x):
# returns the most likely state sequence given observed sequence x
# using the Viterbi algorithm
self.T = len(x)
self.val_max = np.zeros((self.T, self.M))
self.arg_max = np.zeros((self.T, self.M))
self.val_max[0] = self.pi*self.B[:,x[0]]
for t in range(1, self.T):
# Indepedent Process
self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0 )
# Dependent Process
self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
# BACKTRACK
states = np.zeros(self.T, dtype=np.int32)
states[self.T-1] = np.argmax(self.val_max[self.T-1])
for t in range(self.T-2, -1, -1):
states[t] = self.arg_max[t+1, states[t+1]]
return states
def get_val(self):
'''Independent Process'''
for t in range(1,self.T):
self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,self.obs[t]]) , axis = 0 )
self.output.put(self.val_max)
def get_arg(self):
'''Dependent Process'''
for t in range(1,self.T):
while 1:
# Process info if available
if self.val_max[t-1].any() != 0:
self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
break
# Else sleep and wait for info to arrive
sleep(self.sleep_time)
self.output.put(self.arg_max)
def get_path_parallel(self,x):
self.obs = x
self.T = len(obs)
self.val_max = np.zeros((self.T, self.M))
self.arg_max = np.zeros((self.T, self.M))
val_process = mp.Process(target=self.get_val)
arg_process = mp.Process(target=self.get_arg)
# get first initial value for val_max which can feed arg_process
self.val_max[0] = self.pi*self.B[:,obs[0]]
arg_process.start()
val_process.start()
arg_process.join()
val_process.join()
Примечание: get_path_parallel еще не имеет возврата.
Может показаться, что процессы val_process и arg_process на самом деле никогда не запускаются. На самом деле не уверен, почему это происходит. Вы можете запустить код на примере Википедии для алгоритма Витерби.
obs = np.array([0,1,2]) # normal then cold and finally dizzy
pi = np.array([0.6,0.4])
A = np.array([[0.7,0.3],
[0.4,0.6]])
B = np.array([[0.5,0.4,0.1],
[0.1,0.3,0.6]])
viterbi = Viterbi(A,B,pi)
path = viterbi.get_path(obs)
Я также пытался использовать Ray. Однако я понятия не имел, что я там делал на самом деле. Не могли бы вы помочь порекомендовать мне, что делать, чтобы запустить параллельную версию. Должно быть, я делаю что-то очень неправильное, но я не знаю, что.
Ваша помощь будет высоко оценена.