Я новичок в пакете multiprocessing
в python, и моя путаница, вероятно, будет легко проясниться для тех, кто знает больше. Я читал о параллелизме и искал другие вопросы, подобные этому, и ничего не нашел. (К вашему сведению, я НЕ хочу использовать multithreading
, потому что GIL сильно замедлит работу моего приложения.)
Я думаю в рамках событий. Я хочу, чтобы несколько процессов выполнялись в ожидании события. Если событие происходит, оно назначается конкретному процессу, который работает, а затем возвращается в состояние ожидания. Возможно, есть лучший способ сделать это, но я полагаю, что я должен создавать все процессы один раз и держать их открытыми бесконечно, а не создавать, а затем закрывать процесс каждый раз, когда происходит событие. Для меня важна скорость, и мои события могут происходить много тысяч раз в секунду.
Я придумал следующий игрушечный пример, который предназначен для отправки четных чисел в один процесс и нечетных чисел в другой. Оба процесса одинаковы, они просто добавляют номер в список.
from multiprocessing import Process, Queue, Pipe
slist=['even','odd']
Q={}
Q['even'] = Queue()
Q['odd'] = Queue()
ev,od = [],[]
Q['even'].put(ev)
Q['odd'].put(od)
P={}
P['even'] = Pipe()
P['odd'] = Pipe()
def add_num(s):
""" The worker function, invoked in a process. The results are placed in
a list that's pushed to a queue."""
# while True :
if not P[s][1].recv():
print s,'- do nothing'
else:
d = Q[s].get()
print d
d.append(P[s][1].recv())
Q[s].put(d)
print Q[s].get()
P[s][0].send(False)
print 'ya'
def piper(s,n):
P[s][0].send(n)
for k in [S for S in slist if S != s]:
P[k][0].send(False)
add_num(s)
procs = [ Process (
target=add_num,
args=(i,)
)
for i in ['even','odd']]
for s in slist:
P[s][0].send(False)
for p in procs:
p.start()
p.join()
for i in range(10):
print i
if i%2==0:
s = 'even'
else:
s = 'odd'
piper(s,i)
print 'results:', Q['odd'].get(),Q['even'].get()
Этот код производит следующее:
even - do nothing
Любое понимание этой проблемы от мудрых, где мой код или рассуждения не соответствуют действительности и т. Д., Буду очень признателен.
args=(i,)
- person 1.618   schedule 11.04.2015print d
в функцииadd_num
должен вызывать исключение. - person Himal   schedule 11.04.2015Queue
, так иPipe
для каждого сбивает с толку. Вы также вызываетеqueue.get
,queue.put
, а затем сноваqueue.get
, и все это внутри рабочей функции. Это почему? Первый вызовget
всегда будет вызывать взаимоблокировку, если вы когда-нибудь столкнетесь с ним, потому что вы никогда неput
ничего в очередь помещаете из родителя. Кроме того, вы загружаетеFalse
в оба канала, запуская дочерние процессы, которые видятFalse
при вызовеif not P[s][1].recv():
, а затем сразу же завершают работу. Зачем это делать? Каково ожидаемое поведение здесь? - person dano   schedule 11.04.2015Pool
не позволяет различать процессы, поэтому одно задание делится на количество процессов. Могу ли я явно отправить данные в соответствующий процесс? Или, возможно, я могу сделать процессы универсальными и передать целевой объект - person Wapiti   schedule 11.04.2015