Базовая многопроцессорность с циклом while

Я новичок в пакете multiprocessing в python, и моя путаница, вероятно, будет легко проясниться для тех, кто знает больше. Я читал о параллелизме и искал другие вопросы, подобные этому, и ничего не нашел. (К вашему сведению, я НЕ хочу использовать multithreading, потому что GIL сильно замедлит работу моего приложения.)

Я думаю в рамках событий. Я хочу, чтобы несколько процессов выполнялись в ожидании события. Если событие происходит, оно назначается конкретному процессу, который работает, а затем возвращается в состояние ожидания. Возможно, есть лучший способ сделать это, но я полагаю, что я должен создавать все процессы один раз и держать их открытыми бесконечно, а не создавать, а затем закрывать процесс каждый раз, когда происходит событие. Для меня важна скорость, и мои события могут происходить много тысяч раз в секунду.

Я придумал следующий игрушечный пример, который предназначен для отправки четных чисел в один процесс и нечетных чисел в другой. Оба процесса одинаковы, они просто добавляют номер в список.

from multiprocessing import Process, Queue, Pipe

slist=['even','odd']

Q={}
Q['even'] = Queue()
Q['odd'] = Queue()

ev,od = [],[]

Q['even'].put(ev)
Q['odd'].put(od)

P={}
P['even'] = Pipe()
P['odd'] = Pipe()



def add_num(s):
    """ The worker function, invoked in a process. The results are placed in
        a list that's pushed to a queue."""
#    while True :
    if not P[s][1].recv():
        print s,'- do nothing'

    else:            
        d = Q[s].get()
        print d
        d.append(P[s][1].recv())
        Q[s].put(d)
        print Q[s].get()
        P[s][0].send(False)
        print 'ya'




def piper(s,n):

    P[s][0].send(n)    
    for k in [S for S in slist if S != s]:
        P[k][0].send(False) 
    add_num(s)


procs = [ Process (
                   target=add_num,
                   args=(i,)
                   ) 
         for i in ['even','odd']]

for s in slist: 
    P[s][0].send(False)

for p in procs:
    p.start()  
    p.join()

for i in range(10):
    print i
    if i%2==0:
        s = 'even'
    else:
        s = 'odd'
    piper(s,i)


print 'results:', Q['odd'].get(),Q['even'].get()

Этот код производит следующее:

even - do nothing

Любое понимание этой проблемы от мудрых, где мой код или рассуждения не соответствуют действительности и т. Д., Буду очень признателен.


person Wapiti    schedule 10.04.2015    source источник
comment
если вам нужен кортеж из одного элемента, вам нужна запятая в конце, например args=(i,)   -  person 1.618    schedule 11.04.2015
comment
Спасибо. Это действительно устранило мою ошибку. Но код почему-то не работает. Я отредактировал вопрос выше, чтобы отразить новый статус, и оставил его открытым, пока я продолжаю над ним работать.   -  person Wapiti    schedule 11.04.2015
comment
Этот оператор print d в функции add_num должен вызывать исключение.   -  person Himal    schedule 11.04.2015
comment
Истинный. Я исправил это и снова обновил. Мне нужно прекратить работать над этим сейчас, и, возможно, к лучшему, поскольку это совершенно меня озадачивает.   -  person Wapiti    schedule 11.04.2015
comment
@Wapiti Можете ли вы объяснить, что вы на самом деле пытаетесь здесь сделать? Использование как Queue, так и Pipe для каждого сбивает с толку. Вы также вызываете queue.get, queue.put, а затем снова queue.get, и все это внутри рабочей функции. Это почему? Первый вызов get всегда будет вызывать взаимоблокировку, если вы когда-нибудь столкнетесь с ним, потому что вы никогда не put ничего в очередь помещаете из родителя. Кроме того, вы загружаете False в оба канала, запуская дочерние процессы, которые видят False при вызове if not P[s][1].recv():, а затем сразу же завершают работу. Зачем это делать? Каково ожидаемое поведение здесь?   -  person dano    schedule 11.04.2015
comment
Спасибо за ваш вопрос. Я пытаюсь иметь центральный процесс, который доставляет данные (в данном случае только числа, но они обобщают). Я хочу, чтобы было два процесса (обобщает на n), которые ничего не делают, пока им не будут отправлены данные. Я думал сделать это с помощью труб. Есть два канала, которые соединяют основной процесс с двумя процессами. Работа этих процессов также обобщается, но в данном случае она заключается в добавлении числа в список. Я использую здесь очередь, потому что хочу, чтобы объект, над которым работает процесс, был доступен и из основного процесса. Я знаю, что это неправильно...   -  person Wapiti    schedule 11.04.2015
comment
Ваша цель, вероятно, может быть легко достигнута с помощью многопроцессорной обработки. Пул   -  person 1.618    schedule 11.04.2015
comment
Я думал, что Pool не позволяет различать процессы, поэтому одно задание делится на количество процессов. Могу ли я явно отправить данные в соответствующий процесс? Или, возможно, я могу сделать процессы универсальными и передать целевой объект   -  person Wapiti    schedule 11.04.2015


Ответы (1)


Вот подход, который я использовал пару раз с большим успехом:

  1. Запустите многопроцессорный пул.

  2. Используйте многопроцессорный SyncManager для создания нескольких очередей (одна для каждый тип данных должен обрабатываться по-разному).

  3. Используйте apply_async для запуска функций, обрабатывающих данные. . Как и в случае с очередями, должна быть одна функция для каждого типа данных, которые нужно обрабатывать по-разному. Каждая запущенная функция получает в качестве входного аргумента очередь, соответствующую ее данным. Функции будут выполнять свою работу в бесконечном цикле, который начинается с получения данных из очереди.

  4. Начать обработку. Во время обработки основной процесс сортирует данные и решает, какая функция должна их обрабатывать. После принятия решения данные помещаются в очередь, соответствующую этой функции.

  5. После обработки всех данных основной процесс помещает в каждую очередь значение, называемое «таблеткой яда». Ядовитая пилюля — это значение, которое все рабочие процессы распознают как сигнал к выходу. Поскольку очереди работают по принципу «первым пришел – первым обслужен» (FIFO), то они гарантированно вытягивают ядовитую пилюлю как последний элемент в очередях.

  6. Закройте и присоединитесь к многопроцессорному пулу.

Код

Ниже приведен пример этого алгоритма. Цель примера кода — использовать ранее описанный алгоритм для деления нечетных чисел на 2 и четных чисел на -2. Все результаты помещаются в общий список, доступный основному процессу.

import multiprocessing

POISON_PILL = "STOP"

def process_odds(in_queue, shared_list):

    while True:

        # block until something is placed on the queue
        new_value = in_queue.get() 

        # check to see if we just got the poison pill
        if new_value == POISON_PILL:
            break

        # we didn't, so do the processing and put the result in the
        # shared data structure
        shared_list.append(new_value/2)

    return

def process_evens(in_queue, shared_list):

    while True:    
        new_value = in_queue.get() 
        if new_value == POISON_PILL:
            break

        shared_list.append(new_value/-2)

    return

def main():

    # create a manager - it lets us share native Python object types like
    # lists and dictionaries without worrying about synchronization - 
    # the manager will take care of it
    manager = multiprocessing.Manager()

    # now using the manager, create our shared data structures
    odd_queue = manager.Queue()
    even_queue = manager.Queue()
    shared_list = manager.list()

    # lastly, create our pool of workers - this spawns the processes, 
    # but they don't start actually doing anything yet
    pool = multiprocessing.Pool()

    # now we'll assign two functions to the pool for them to run - 
    # one to handle even numbers, one to handle odd numbers
    odd_result = pool.apply_async(process_odds, (odd_queue, shared_list))
    even_result = pool.apply_async(process_evens, (even_queue, shared_list))
    # this code doesn't do anything with the odd_result and even_result
    # variables, but you have the flexibility to check exit codes
    # and other such things if you want - see docs for AsyncResult objects

    # now that the processes are running and waiting for their queues
    # to have something, lets give them some work to do by iterating
    # over our data, deciding who should process it, and putting it in
    # their queue
    for i in range(6):

        if (i % 2) == 0: # use mod operator to see if "i" is even
            even_queue.put(i)

        else:
            odd_queue.put(i)

    # now we've finished giving the processes their work, so send the 
    # poison pill to tell them to exit
    even_queue.put(POISON_PILL)
    odd_queue.put(POISON_PILL)

    # wait for them to exit
    pool.close()
    pool.join()

    # now we can check the results
    print(shared_list)

    # ...and exit!
    return


if __name__ == "__main__":
    main()

Выход

Этот код производит этот вывод:

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

Обратите внимание, что порядок результатов непредсказуем, потому что мы не можем гарантировать, в каком порядке функции смогут получать элементы из своих очередей и помещать результаты в список. Но вы, безусловно, можете выполнить любую необходимую вам постобработку, которая может включать сортировку.

Обоснование

Я думаю, что это было бы хорошим решением вашей проблемы, потому что:

  1. Вы правы в том, что процессы порождения несут огромные накладные расходы. Этот подход с одним производителем и несколькими потребителями устраняет это, когда вы используете пул, чтобы поддерживать работоспособность рабочих на протяжении всей программы.

  2. Он устраняет ваши опасения по поводу возможности обработки данных по-разному в зависимости от атрибутов данных. В своих комментариях вы выразили озабоченность по поводу возможности отправки данных в определенные процессы. При таком подходе вы можете выбирать, каким процессам отдавать данные, потому что вам нужно выбрать, в какую очередь их ставить. (Кстати, я думаю, вы имеете в виду pool.map, которая, как вы правильно считаете, не позволяет выполнять разные операции в одном задании. apply_async позволяет.)

  3. Я нашел его очень расширяемым и гибким. Нужно добавить больше типов обработки данных? Просто напишите свою функцию-обработчик, добавьте еще одну очередь и добавьте логику в основную, чтобы направить данные в вашу новую функцию. Вы обнаруживаете, что одна очередь копируется и становится узким местом? Вы можете вызывать apply_async с одной и той же целевой функцией и ставить в очередь несколько раз, чтобы несколько рабочих процессов работали в одной очереди. Просто убедитесь, что вы дали в очередь достаточно ядовитых пилюль, чтобы все рабочие получили по одной.

Ограничения

Любые данные, которые вы хотите передать в очередь, должны быть доступны для выбора (сериализации) модулем pickle. Посмотрите здесь, чтобы узнать, можно и нельзя мариновать.

Вероятно, есть и другие ограничения, но я не могу придумать какие-либо другие в своей голове.

person skrrgwasme    schedule 26.04.2015
comment
Это отличный ответ и очень полезный. Я добился некоторого прогресса с момента публикации, но не решил проблему полностью. У меня есть несколько вопросов: что, если данные никогда не останавливаются, при условии, что обработка выполняется достаточно быстро, чтобы не создавать резервные копии очередей, может ли эта непрерывность создавать проблемы? Кроме того, если бы я захотел позже изменить функциональность, чтобы вход каждой функции зависел от вывода другой, было бы это так же просто, как настроить очереди в правильной структуре? Не создаст ли асинхронный характер процессов какие-либо проблемы? - person Wapiti; 26.04.2015
comment
Бесконечно работающий скрипт сам по себе не обязательно является проблемой. Просто убедитесь, что ваши бесконечные циклы правильно управляют своими переменными, чтобы вещи выходили за рамки и могли быть удалены сборщиком мусора, когда вы закончите с ними, иначе у вас в конечном итоге закончится память. Передача данных от одной функции к другой также не должна быть проблемой; У меня были данные, которые требовали нескольких этапов обработки, поэтому я связал воркеров в пуле вместе с очередями, как в конвейере. Это может быть сложно, но это не проблема, если все правильно подключено. - person skrrgwasme; 26.04.2015
comment
Я очень ценю этот ответ. Спасибо за подробности и ответы на мои вопросы. Не имея опыта работы с пакетом, мне потребовалось немного времени, чтобы разобраться в архитектуре. Вы предоставили именно то, что я искал. Просто звучный голос опыта. Спасибо большое. - person Wapiti; 26.04.2015