Многопроцессорность Python для каждого ключа в словаре

Я новичок в python, и я пытаюсь параллельно масштабировать свою обработку. У меня есть файл с определенным количеством кортежей, каждый из которых имеет определенное значение в последнем столбце. Я хочу разделить данные этого файла и применить свою функцию параллельно к каждому фрагменту. Но дело в том, чтобы разбить данные на части на основе значения последнего столбца и применить функцию для каждого фрагмента. Например, последний столбец может иметь «a» для некоторых кортежей, «b» для некоторых и «c» для некоторых. Итак, в этом случае я должен получить три куска и обработать их параллельно. Количество уникальных значений в последнем столбце может измениться в зависимости от набора данных, поэтому мне нужно соответственно использовать ЦП.

Q1: То, что я пытался до сих пор, - это прочитать файл и создать словарь на основе этих записей, так что в основном три пары ключ-значение для приведенной выше, одна с 'a' в качестве ключа и все записи, имеющие 'a' в качестве значений и то же самое для 'b' и 'c'. Я могу использовать chunksize в многопроцессорной обработке, но здесь это не размер, он основан на ключе, так как я могу этого добиться?

Q2: После обработки вышеуказанных фрагментов мне нужен вывод всего вместе, порядок не имеет значения, а затем мне нужно использовать весь вывод для дальнейшей обработки, как я могу заставить мою основную программу ждать завершения всех этих процессов?

Сообщите мне, если потребуется дополнительная информация. Спасибо.


person ds_user    schedule 17.09.2014    source источник
comment
Почему вы хотите разделить свой ввод на три фиксированные группы? Разве функция, применяемая ко всем из них, не одинакова? Если это так, то сделать это будет намного проще, если вы не настаиваете на разделении вручную, то есть разделите данные после, у вас есть результаты функции   -  person loopbackbee    schedule 17.09.2014
comment
Да, это та же функция, но я должен применять ее отдельно для каждого набора записей, имеющих одинаковое значение в последнем столбце. И это не три фиксированные группы, это количество уникальных значений в последнем столбце.   -  person ds_user    schedule 17.09.2014
comment
Разве вы не можете разделить их после параллельного процесса?   -  person loopbackbee    schedule 17.09.2014
comment
Нет, функция должна выполняться только для этого конкретного набора записей, потому что она будет выполнять комбинации этих записей, поэтому она не должна смешиваться с записями с другим идентификатором в последнем столбце.   -  person ds_user    schedule 17.09.2014
comment
Значит, функция работает с набором записей, а не с каждой записью в отдельности? Если да, то что вам нужно, так это одновременная обработка трех наборов?   -  person loopbackbee    schedule 17.09.2014
comment
да. Точно. он работает с набором записей с одинаковым значением в последнем столбце.   -  person ds_user    schedule 17.09.2014


Ответы (1)


Предполагая, как вы описали, у вас есть три набора значений в словаре d, и вы хотите применить функцию f к каждому из них отдельно:

from multiprocessing import Pool
p = Pool()                                   #number of processes = number of CPUs
keys, values= zip(*d.iteritems())            #ordered keys and values
processed_values= p.map( f, values )         #apply the function f to each set and wait for result
#then proceed to join the three sets
person loopbackbee    schedule 17.09.2014
comment
Нужно ли жестко указывать количество ЦП? Можно ли запрограммировать его таким образом, чтобы он получал количество процессов в зависимости от количества ключей в моем словаре? У меня будет больше процессоров, и, надеюсь, количество моих ключей никогда не превысит лимит процессора. Итак, я хочу это взять. - person ds_user; 17.09.2014
comment
@Jeeva Вы можете указать количество процессов в качестве единственного аргумента: Pool(len(d)), но это может не принести вам каких-либо значительных преимуществ, поскольку процессы останутся неиспользованными, если n_processes<n_tasks (и, надеюсь, создание процесса очень легкое по сравнению с f) - person loopbackbee; 17.09.2014
comment
Нет, вот почему я думаю о том, чтобы включить в свою программу количество процессов в зависимости от количества ключей. Если у меня есть четыре набора записей или четыре ключа, я могу инициализировать этот p = Pool (4) на основе этого права? И еще одна вещь, которую я использую python 3.4, я получаю сообщение об ошибке - AttributeError: объект 'dict' не имеет атрибута 'iteritems' - person ds_user; 17.09.2014
comment
@Jeeva Можно, если вы уверены, что n_cpus>=n_tasks всегда выполняется (в противном случае у вас будет несколько загруженных процессов на одном процессоре, что не страшно, но, как правило, тоже не идеально). На python 3 iteritems был переименован в items - person loopbackbee; 17.09.2014
comment
Хорошо, я добавил это в конце, p.close () # больше никаких задач p.join () # завершение текущих задач print (результат) и выполнил его. Но я получаю эти ошибки - dpaste.com/06PTEBG - person ds_user; 17.09.2014
comment
@Jeeva Как видно из трассировки, ошибка возникает в вашем коде (Probabilistic_database\src\first_mr.py). Если вам нужна помощь, задайте новый вопрос - person loopbackbee; 17.09.2014
comment
В порядке. Но я попытался выполнить свой код без этого шага многопроцессорности, он работает нормально, и я получаю словарь в качестве вывода. :( - person ds_user; 17.09.2014
comment
Позвольте нам продолжить это обсуждение в чате. - person ds_user; 17.09.2014
comment
@maybe result - это какая-то глобальная переменная, которая зависит от предыдущих выполнений, которые должны быть установлены? сложно сказать без чтения кода - person loopbackbee; 17.09.2014
comment
Я не упомянул основную функцию и вызвал процесс напрямую, моя беда .. :( Я изменил их, и теперь это работает .. Спасибо .. :) - person ds_user; 17.09.2014
comment
Для python 3 см. stackoverflow.com/a/53938444/3342058 - person Baschdl; 20.12.2020