Многопроцессорные пулы с разными функциями

Большинство примеров многопроцессорных рабочих пулов выполняют одну функцию в разных процессах, например.

def foo(args):
   pass

if __name__ == '__main__':
   pool = multiprocessing.Pool(processes=30)
   res=pool.map_async(foo,args)

Есть ли способ справиться с двумя разными и независимыми функциями в пуле? Чтобы вы могли назначить f.e. 15 процессов для foo () и 15 процессов для bar () или пул ограничен одной функцией? Или вам нужно создать разные процессы для разных функций вручную с помощью

 p = Process(target=foo, args=(whatever,))
 q = Process(target=bar, args=(whatever,))
 q.start()
 p.start()

а про рабочий пул забыть?


person dorvak    schedule 07.08.2011    source источник


Ответы (6)


Чтобы передать разные функции, вы можете просто вызвать map_async несколько раз.

Вот пример, чтобы проиллюстрировать это,

from multiprocessing import Pool
from time import sleep

def square(x):
    return x * x

def cube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(f, range(10))
result_cubes = pool.map_async(g, range(10))

В результате получится:

>>> print result_squares.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

>>> print result_cubes.get(timeout=1)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
person Ocaj Nires    schedule 07.08.2011
comment
И будут ли они выполняться параллельно или подряд? - person dorvak; 08.08.2011
comment
map_async немедленно возвращается. Пока в пуле достаточно свободных процессов, новые задачи будут запускаться без ожидания. В приведенном выше примере они будут работать параллельно. @злой ученый - person Ocaj Nires; 08.08.2011
comment
Спасибо! Но я думаю, нет возможности назначить определенное количество рабочих / процессов? - person dorvak; 08.08.2011
comment
multiprocessing Pool API не предоставляет механизма для назначения конкретных количество рабочих в одном пуле. Если вам действительно нужно определенное количество рабочих на задачу, создавайте разные пулы. Хотя рекомендуется иметь только один пул. Думаю, имеет смысл, что пул должен управлять этим за вас прозрачно, не беспокоясь об этом. - person Ocaj Nires; 08.08.2011
comment
Спасибо за ответ, вы уверены, что добавление map_async() один за другим будет выполняться параллельно. Я действительно пробовал это, и, как показывает ответ @Sam, они, похоже, работают последовательно. - person Zhubarb; 22.01.2020

Они не будут работать параллельно. См. Следующий код:

def updater1(q,i):    
    print "UPDATER 1:", i
    return

def updater2(q,i):    
    print "UPDATER2:", i
    return

if __name__=='__main__':
    a = range(10)
    b=["abc","def","ghi","jkl","mno","pqr","vas","dqfq","grea","qfwqa","qwfsa","qdqs"]


    pool = multiprocessing.Pool()

    func1 = partial(updater1,q)
    func2 = partial(updater2,q)
    pool.map_async(func1, a)
    pool.map_async(func2, b)

    pool.close()
    pool.join()

Приведенный выше код дает следующую распечатку:

UPDATER 1: 1
UPDATER 1: 0
UPDATER 1: 2
UPDATER 1: 3
UPDATER 1: 4
UPDATER 1: 5
UPDATER 1: 6
UPDATER 1: 7
UPDATER 1: 8
UPDATER 1: 9
UPDATER2: abc
UPDATER2: def
UPDATER2: ghi
UPDATER2: jkl
UPDATER2: mno
UPDATER2: pqr
UPDATER2: vas
UPDATER2: dqfq
UPDATER2: grea
UPDATER2: qfwqa
UPDATER2: qwfsa
UPDATER2: qdqs
person Sam    schedule 24.05.2017

Вы можете использовать map или какую-нибудь лямбда-функцию (отредактируйте: на самом деле вы не можете использовать лямбда-функцию). Вы можете использовать простую функцию карты:

def smap(f, *args):
    return f(*args)

pool = multiprocessing.Pool(processes=30)
res=pool.map(smap, function_list, args_list1, args_list2,...)

Обычная функция map принимает в качестве входных данных итерации, что неудобно.

person Rayamon    schedule 26.11.2018
comment
Это следует принять как правильный ответ, потому что принятый ответ выполняется в квазипараллельном режиме (с ужасным планировщиком). - person ARA1307; 29.02.2020

Вот рабочий пример идеи, которой поделился @Rayamon:

import functools

from multiprocessing import Pool


def a(param1, param2, param3):
    return param1 + param2 + param3


def b(param1, param2):
    return param1 + param2


def smap(f):
    return f()


func1 = functools.partial(a, 1, 2, 3)
func2 = functools.partial(b, 1, 2)

pool = Pool(processes=2)
res = pool.map(smap, [func1, func2])
pool.close()
pool.join()
print(res)
person ARA1307    schedule 29.02.2020
comment
Как передать список значений в качестве аргумента, и он работает индивидуально в потоках .. В случае одной функции он работает нормально, но не в случае нескольких функций .. - person Madan Raj; 23.07.2021

Чтобы дополнительно объяснить другой ответ выше, вот пример:

  1. Запустите одну функцию с несколькими входами параллельно, используя пул (квадратная функция). Интересная сторона. Обратите внимание на искаженную операцию в строках для 5 981 25
  2. Запуск нескольких функций с разными входами (как args, так и kwargs) и сбор их результатов с помощью пула (функции pf1, pf2, pf3)
import datetime
import multiprocessing
import time
import random

from multiprocessing import Pool

def square(x):
    # calculate the square of the value of x
    print(x, x*x)
    return x*x

def pf1(*args, **kwargs):
    sleep_time = random.randint(3, 6)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
    print("Keyword Args from pf1: %s" % kwargs)
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
    return (sum(*args), kwargs)

def pf2(*args):
    sleep_time = random.randint(7, 10)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def pf3(*args):
    sleep_time = random.randint(0, 3)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def smap(f, *arg):
    if len(arg) == 2:
        args, kwargs = arg
        return f(list(args), **kwargs)
    elif len(arg) == 1:
        args = arg
        return f(*args)


if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the dataset
    print ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3
    with Pool(processes=agents) as pool:
        result = pool.map(square, dataset)
    print("Result of Squares : %s\n\n" % result)
    with Pool(processes=3) as pool:
        result = pool.starmap(smap, [(pf1, [1,2,3], {'a':123, 'b':456}), (pf2, [11,22,33]), (pf3, [111,222,333])])

    # Output the result
    print ('Result: %s ' % result)


Output:
*******

Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1 1
2 4
3 9
4 16
6 36
7 49
8 64
59 81
 25
10 100
11 121
12 144
13 169
14 196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]


Process : ForkPoolWorker-6  Function : pf1  Args: ([1, 2, 3],)  sleeping for 3  Time : 2020-07-20 00:51:56.477299

Keyword Args from pf1: {'a': 123, 'b': 456}
Process : ForkPoolWorker-7  Function : pf2  Args: ([11, 22, 33],)   sleeping for 8  Time : 2020-07-20 00:51:56.477371

Process : ForkPoolWorker-8  Function : pf3  Args: ([111, 222, 333],)    sleeping for 1  Time : 2020-07-20 00:51:56.477918

ForkPoolWorker-8    pf3 done at 2020-07-20 00:51:57.478808

ForkPoolWorker-6    pf1 done at 2020-07-20 00:51:59.478877

ForkPoolWorker-7    pf2 done at 2020-07-20 00:52:04.478016

Result: [(6, {'a': 123, 'b': 456}), 66, 666] 

Process finished with exit code 0

person Vikash Raja Samuel Selvin    schedule 20.07.2020

Множественные функции

В следующем примере показано, как запускать несколько функций в пуле.

from multiprocessing import Pool
import functools

def inc(x):
    return x + 1

def dec(x):
    return x - 1

def add(x, y):
    return x + y

def smap(f):
    return f()

def main():

    f_inc = functools.partial(inc, 4)
    f_dec = functools.partial(dec, 2)
    f_add = functools.partial(add, 3, 4)

    with Pool() as pool:
        res = pool.map(smap, [f_inc, f_dec, f_add])
        print(res)

if __name__ == '__main__':
    main()

У нас есть три функции, которые запускаются независимо в пуле. Мы используем functools.partial для подготовки функций и их параметров перед их выполнением.

Источник: https://zetcode.com/python/multiprocessing/.

person Elder Druid    schedule 23.02.2021