Проблема concurrent.futures: почему только 1 рабочий?

Я экспериментирую с использованием concurrent.futures.ProcessPoolExecutor для распараллеливания последовательной задачи. Последовательная задача включает в себя поиск числа вхождений данного числа из диапазона чисел. Мой код показан ниже.
Во время его выполнения я заметил в Диспетчере задач / Системном мониторе / top, что постоянно работает только один процессор/поток, несмотря на то, что max_workers processPoolExecutor имеет значение больше 1. Почему это так? Как я могу распараллелить свой код, используя concurrent.futures? Мой код был выполнен с помощью python 3.5.

import concurrent.futures as cf
from time import time

def _findmatch(nmax, number):    
    print('def _findmatch(nmax, number):')
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end = time() - start
    print("found {} in {}sec".format(len(match),end))
    return match

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        future = executor.submit(_findmatch, nmax, number)
        futures = future.result()
        found = len(futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(found, end))
    return futures

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(len(a),end))

person Sun Bear    schedule 05.02.2017    source источник


Ответы (2)


Проблема с вашим кодом заключается в том, что он отправляет только одну задачу, которая затем будет выполняться одним из рабочих, в то время как остальные ничего не делают. Вам необходимо отправить несколько задач, которые могут выполняться рабочими параллельно.

В приведенном ниже примере область поиска разделена на три разные задачи, каждая из которых выполняется разными исполнителями. Добавлены фьючерсы, возвращаемые submit в список, и как только все они будут отправлены, wait< /a> используется для ожидания их завершения. Если вы вызываете result сразу после отправка задачи будет заблокирована до тех пор, пока будущее не будет завершено.

Обратите внимание, что вместо создания списка чисел приведенный ниже код просто подсчитывает числа, в которых есть цифра 5, чтобы уменьшить использование памяти:

import concurrent.futures as cf
from time import time

def _findmatch(nmin, nmax, number):
    print('def _findmatch', nmin, nmax, number)
    start = time()
    count = 0
    for n in range(nmin, nmax):
        if number in str(n):
            count += 1
    end = time() - start
    print("found {} in {}sec".format(count,end))
    return count

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        chunk = nmax // workers
        futures = []

        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax

            futures.append(executor.submit(_findmatch, cstart, cstop, number))

        cf.wait(futures)
        res = sum(f.result() for f in futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(res, end))
    return res

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(a,end))

Выход:

def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec
person niemmi    schedule 05.02.2017
comment
Спасибо. Пока перевариваю ваши советы, у меня возник вопрос. Зачем нужно вручную создавать чанки? Разве concurrent.futures.ProcessPoolExecutor не предполагает автоматически распределять работу по решению данной функции среди своего пула рабочих? - person Sun Bear; 05.02.2017
comment
@SunBear: Ваша работа как программиста — разделить работу на куски, которые рабочие могут выполнять независимо. ProcessPoolExecutor заботится о том, чтобы вызов переданных ему фрагментов выполнялся рабочими процессами. Обратите внимание, что вместо того, чтобы разделить задачу на три части в примере, я мог бы разделить ее на 10 разных задач, и конечный результат был бы таким же (конечно, вывод консоли был бы другим, поскольку _findmatch запускался бы 10 раз). - person niemmi; 05.02.2017
comment
Спасибо за ваши указатели. Я переписал код для вывода списка с встречающимися числами. Я опубликую это в своем следующем вопросе, где я сравнил его производительность с executor.map(). - person Sun Bear; 06.02.2017
comment
Я сравнил .submit() и .map() с серийным кодом stackoverflow.com/q/42074501/5722359. Пожалуйста, прокомментируйте, если у вас есть время. - person Sun Bear; 06.02.2017

Запуск вашего кода показывает, что все три рабочих процесса присутствуют, но двое из них спят. Проблема в том, что executor.submit(_findmatch, nmax, number) приказывает одному исполнителю выполнить функцию _findmatch.

Я не понимаю, что делает ваш код, но в основном вам нужно либо

  • разделить задачу на три четные части и отправить каждую часть в процесс с помощью executor.submit
  • разбейте задачу на более мелкие части (скажем, часть, состоящую из 100 элементов) и используйте map поэтому каждый _findmatch получает только тот фрагмент, которому он назначен.
person hansaplast    schedule 05.02.2017