Похоже, есть более простое решение этой проблемы.
Вы строите список будущего. Весь смысл фьючерсов в том, что они являются будущими результатами. В частности, что бы ни возвращала каждая функция, это (конечное) значение будущего. Итак, вообще не делайте всю эту вещь «поместить результаты в очередь», просто верните их из функции задачи и заберите их из фьючерсов.
Самый простой способ сделать это — разорвать эту петлю так, чтобы каждый ключ представлял собой отдельную задачу с отдельным будущим. Я не знаю, подходит ли это для вашего реального кода, но если это так:
def do_task(key):
try:
return perform_scan.delay(key)
except:
print "failed"
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(do_task, key) for key in key_list]
# If you want to do anything with these results, you probably want
# a loop around concurrent.futures.as_completed or similar here,
# rather than waiting for them all to finish, ignoring the results,
# and printing the number of them.
concurrent.futures.wait(futures)
print len(futures)
Конечно, это не делает группировку. Но нужно ли вам это?
Наиболее вероятная причина необходимости группировки заключается в том, что задачи настолько малы, что накладные расходы на их планирование (и обработку входных и выходных данных) затмевают реальную работу. Если это так, то вы почти наверняка можете подождать, пока не будет выполнена целая партия, чтобы вернуть какие-либо результаты. Особенно учитывая, что вы даже не смотрите на результаты, пока они все равно не будут сделаны. (Эта модель «разделить на группы, обработать каждую группу, снова объединить» довольно распространена в таких случаях, как числовая работа, где каждый элемент может быть крошечным, или элементы могут быть не независимыми друг от друга, но есть большие группы. достаточно или независимо от остальной части работы.)
Во всяком случае, это почти так же просто:
def do_tasks(keys):
results = []
for key in keys:
try:
result = perform_scan.delay(key)
results.append(result)
except:
print "failed"
return results
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
print sum(len(results) for results in concurrent.futures.as_completed(futures))
Или, если вы предпочитаете сначала подождать, а затем вычислить:
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
concurrent.futures.wait(futures)
print sum(len(future.result()) for future in futures)
Но опять же, я сомневаюсь, что вам нужно даже это.
person
abarnert
schedule
17.10.2014
perform_scan.delay()
является асинхронным удаленным вызовом, который, по-видимому, означает, что он не выполняет никакой обработки и просто ждет ответа, почему вы используете процессы вместо потоков? - person abarnert   schedule 17.10.2014