Моя очередь пуста после завершения многопроцессорной обработки. Экземпляры процесса

У меня есть скрипт python, где в верхней части файла у меня есть:

result_queue = Queue.Queue()
key_list = *a large list of small items* #(actually from bucket.list() via boto)

Я узнал, что очереди — это безопасные для процессов структуры данных. У меня есть метод:

def enqueue_tasks(keys):
    for key in keys:
        try:
            result = perform_scan.delay(key)
            result_queue.put(result)
        except:
           print "failed"

Функция perform_scan.delay() здесь на самом деле вызывает celery worker, но я не думаю, что это актуально (это асинхронный вызов процесса).

У меня тоже есть:

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return izip_longest(fillvalue=fillvalue, *args)

Наконец, у меня есть функция main():

def main():

    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    concurrent.futures.wait(futures)
    print len(result_queue)

Результатом оператора печати является 0. Однако, если я включу оператор печати размером result_queue в enqueue_tasks во время работы программы, я увижу, что размер увеличивается и что-то добавляется в очередь.

Представления о том, что происходит?


person jeffrey    schedule 16.10.2014    source источник
comment
Если perform_scan.delay() является асинхронным удаленным вызовом, который, по-видимому, означает, что он не выполняет никакой обработки и просто ждет ответа, почему вы используете процессы вместо потоков?   -  person abarnert    schedule 17.10.2014


Ответы (2)


Похоже, есть более простое решение этой проблемы.

Вы строите список будущего. Весь смысл фьючерсов в том, что они являются будущими результатами. В частности, что бы ни возвращала каждая функция, это (конечное) значение будущего. Итак, вообще не делайте всю эту вещь «поместить результаты в очередь», просто верните их из функции задачи и заберите их из фьючерсов.


Самый простой способ сделать это — разорвать эту петлю так, чтобы каждый ключ представлял собой отдельную задачу с отдельным будущим. Я не знаю, подходит ли это для вашего реального кода, но если это так:

def do_task(key):
    try:
        return perform_scan.delay(key)
    except:
        print "failed"

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(do_task, key) for key in key_list]
    # If you want to do anything with these results, you probably want
    # a loop around concurrent.futures.as_completed or similar here,
    # rather than waiting for them all to finish, ignoring the results,
    # and printing the number of them.
    concurrent.futures.wait(futures)
    print len(futures)

Конечно, это не делает группировку. Но нужно ли вам это?

Наиболее вероятная причина необходимости группировки заключается в том, что задачи настолько малы, что накладные расходы на их планирование (и обработку входных и выходных данных) затмевают реальную работу. Если это так, то вы почти наверняка можете подождать, пока не будет выполнена целая партия, чтобы вернуть какие-либо результаты. Особенно учитывая, что вы даже не смотрите на результаты, пока они все равно не будут сделаны. (Эта модель «разделить на группы, обработать каждую группу, снова объединить» довольно распространена в таких случаях, как числовая работа, где каждый элемент может быть крошечным, или элементы могут быть не независимыми друг от друга, но есть большие группы. достаточно или независимо от остальной части работы.)

Во всяком случае, это почти так же просто:

def do_tasks(keys):
    results = []
    for key in keys:
        try:
            result = perform_scan.delay(key)
            results.append(result)
        except:
           print "failed"
    return results

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    print sum(len(results) for results in concurrent.futures.as_completed(futures))

Или, если вы предпочитаете сначала подождать, а затем вычислить:

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    concurrent.futures.wait(futures)
    print sum(len(future.result()) for future in futures)

Но опять же, я сомневаюсь, что вам нужно даже это.

person abarnert    schedule 17.10.2014
comment
Мои глаза открылись. Я даже не знаю, почему я не думал о фьючерсах как о простом понимании списка - это полностью противоречит цели моей промежуточной очереди, как вы заявили (составляя другой комментарий к вашему комментарию). Я также убрал группировку — я думал о группировке неправильным способом. Кажется, мне это не нужно, так как, хотя мои отдельные задачи очень маленькие, я не смотрю ни одну из них до конца. Одна вещь, которую я не понимаю, это «future.result». Я думал, что будущее было фактическим результатом, является ли будущее самостоятельным объектом? - person jeffrey; 17.10.2014
comment
@jeffrey: Да, Future — это объект который содержит результат, который может быть еще недоступен. Можно спроектировать модель параллелизма всего языка вокруг неявных фьючерсов (см. AliceML или любой язык акторов или потоков данных), что может быть очень круто, но это не подходит для Python, поэтому вместо этого он использует явные фьючерсы. Дополнительные обсуждения см. в Википедии. - person abarnert; 17.10.2014

Вам нужно использовать multiprocessing.Queue, а не Queue.Queue. Queue.Queue безопасен для потоков, а не для процессов, поэтому изменения, которые вы вносите в него в одном процессе, не отражаются ни в каких других.

person dano    schedule 17.10.2014
comment
Также обратите внимание, что я упомянул об этом в комментарий в предыдущем вопросе по этой теме. Прошу прощения, если было непонятно, что я имел в виду. - person dano; 17.10.2014
comment
Я ценю помощь! Вы никак не могли знать, поскольку я не дал контекста моей проблемы. - person jeffrey; 17.10.2014