Компиляция исполняемого файла с помощью dask или многопроцессорной обработки joblib с помощью cython приводит к ошибкам

Я конвертирую некоторые задания Python с последовательной обработкой в ​​многопроцессорную обработку с помощью dask или joblib. К сожалению, мне нужно работать с Windows.
При запуске из IPython или из командной строки с вызовом py-файла с помощью python все работает нормально.
При компиляции исполняемого файла с помощью cython он больше не работает нормально: Шаг постепенно все больше и больше процессов (неограниченное и большее, чем количество запрошенных процессов) запускаются и блокируют мою систему.
Это почему-то похоже на Multiprocessing Bomb - но, конечно, я использовал if __name__=="__main__:" для того, чтобы иметь блок управления - одобрено точным запуском из вызова python в командной строке.
Мой вызов cython - cython --embed --verbose --annotate THECODE.PY, и я компилирую с gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE в результате в исполняемом файле Windows THECODE.exe.
С другим кодом (с однократной обработкой), который работает нормально.
Проблема, похоже, такая же для dask и joblib (что может означать, что dask работает как joblib или основан на нем) .
Есть предложения?

Для тех, кто интересуется mcve: просто взяв первый код из Multiprocessing Bomb и его компиляция с помощью приведенных выше команд cython приведет к тому, что исполняемый файл взорвет вашу систему. (Я только что пробовал :-))

Я нашел кое-что интересное, добавив в образец кода одну строчку для отображения __name__:

import multiprocessing

def worker():
    """worker function"""
    print('Worker')
    return

print("-->" + __name__ + "<--")
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

При запуске этого фрагмента кода с python он показывает

__main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__

(другой вывод отключен). Объясняя, что решение if работает. При запуске исполняемого файла после cython и компиляции отображается

__main__
__main__
__main__
__main__
__main__
__main__

и многое другое. Таким образом, воркеры, вызывающие модуль, больше не masqueraded как импорт, и, таким образом, каждый воркер пытается запустить пять новых рекурсивным образом.


person Bastian Ebeling    schedule 16.11.2017    source источник
comment
Думаю, об этом стоит сообщить как об ошибке (github.com/cython/cython/issues ).   -  person DavidW    schedule 19.11.2017
comment
@DavidW Хороший вариант - я сделаю это прямо сейчас. Спасибо   -  person Bastian Ebeling    schedule 20.11.2017


Ответы (4)


При запуске нового процесса python multiprocessing-module использует spawn-method в Windows (это поведение также можно запустить в Linux, используя mp.set_start_method('spawn').

Аргументы командной строки передаются интерпретатору в новом процессе, поэтому связь с родительским процессом может быть установлена, например:

 python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork

Проблема со встроенными модулями cython (или с замороженными (т.е. созданными с помощью cx_Freeze, py2exe и аналогичными) модулями в целом), что передача им аргументов командной строки больше соответствует

python my_script.py <arguments>

то есть командная строка не обрабатывается интерпретатором автоматически, но должна обрабатываться в сценарии.

multiprocessing предоставляет функцию под названием , который правильно обрабатывает аргументы командной строки и может использоваться, как показано в ответе Бастиана:

if __name__ == '__main__':
    # needed for Cython, as it doesn't set `frozen`-attribute
    setattr(sys, 'frozen', True) 
    # parse command line options and execute it if needed
    multiprocessing.freeze_support()

Однако это решение работает только для Windows, как видно из кода:

def freeze_support(self):
    '''Check whether this is a fake forked process in a frozen executable.
    If so then run code specified by commandline and exit.
    '''
    if sys.platform == 'win32' and getattr(sys, 'frozen', False):
        from .spawn import freeze_support
        freeze_support()

Есть отчет об ошибке: multiprocessing freeze_support требуется вне win32, который может / не может быть исправлен в ближайшее время.

Как объяснено в вышеприведенном отчете об ошибке, недостаточно установить атрибут frozen на True и вызвать freeze_support непосредственно из multiprocessing.spawn, потому что трекер семафоров не обрабатывается правильно.

Я вижу два варианта: либо исправить вашу установку еще не выпущенным патчем из приведенного выше отчета об ошибке, либо использовать подход «сделай сам», представленный ниже.


Вот более ранняя версия этого ответа, которая является более «экспериментальной», но предлагает больше идей / деталей и предлагает решение в некотором стиле Do-It-Yourself.

Я использую linux, поэтому использую mp.set_start_method('spawn') для имитации поведения окон.

Что происходит в spawn-режиме? Давайте добавим несколько sleep, чтобы мы могли исследовать процессы:

#bomb.py
import multiprocessing as mp
import sys
import time

def worker():
    time.sleep(50)
    print('Worker')
    return

if __name__ == '__main__':
        print("Starting...")
        time.sleep(20)
        mp.set_start_method('spawn') ## use spawn!
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()

Используя pgrep python, мы видим, что сначала есть только один процесс на Python, затем 7 (!) Разных pids. Мы можем увидеть аргументы командной строки через cat /proc/<pid>/cmdline. 5 из новых процессов имеют командную строку

-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork

и один:

-c "from multiprocessing.semaphore_tracker import main;main(4)"

Это означает, что родительский процесс запускает 6 новых экземпляров интерпретатора python, и каждый вновь запущенный интерпретатор выполняет код, отправленный от родителя через параметры командной строки, информация передается по каналам. Один из этих 6 экземпляров python - трекер, который наблюдает за всем.

Хорошо, а что будет, если cythonized + встраивается? Так же, как и с обычным питоном, с той лишь разницей, что вместо python запускается bomb-исполняемый файл. Но в отличие от интерпретатора Python, он не выполняет / не знает аргументов командной строки, поэтому функция main запускается снова и снова.

Есть простое решение: пусть bomb-exe запустит интерпретатор python.

 ...
 if __name__ == '__main__':
    mp.set_executable(<PATH TO PYTHON>)
 ....

Теперь bomb больше не многопроцессорная бомба!

Однако цель, вероятно, не в том, чтобы иметь под рукой интерпретатор python, поэтому нам нужно сделать так, чтобы наша программа знала о возможных командных строках:

import re
......
if __name__ == '__main__':
    if len(sys.argv)==3:  # should start in semaphore_tracker mode
        nr=list(map(int, re.findall(r'\d+',sys.argv[2])))          
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work   
        from multiprocessing.semaphore_tracker import main;main(nr[0])

    elif len(sys.argv)>3: # should start in slave mode
        fd, pipe=map(int, re.findall(r'\d+',sys.argv[2]))
        print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe)) 
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work  
        from multiprocessing.spawn import spawn_main; 
        spawn_main(tracker_fd=fd, pipe_handle=pipe)

    else: #main mode
        print("Starting...")
        mp.set_start_method('spawn')
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()

Теперь наша бомба не нуждается в автономном интерпретаторе python и останавливается после того, как рабочие закончили работу. Обратите внимание на следующее:

  1. Способ принятия решения о том, в каком режиме bomb следует запускать, не очень безопасен, но я надеюсь, что вы уловили суть
  2. --multiprocessing-fork - это просто канарейка, она ничего не делает, только должна быть там, см. здесь.

NB: измененный код также можно использовать с python, потому что после выполнения "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork python изменяет sys.argv, поэтому код больше не видит исходную командную строку и len(sys.argv) становится 1.

person ead    schedule 17.11.2017
comment
Нашел решение, которое работает для меня: мне пришлось использовать with multiprocessing.get_context("spawn").Pool() as pool: ... в моем приложении, чтобы заставить Локальный бэкэнд зависимости для использования n_jobs=1. В моем замороженном приложении больше нет mp-бомб: thumbsup: - person Alex; 27.01.2020

Я думаю, что, основываясь на деталях отправленного отчета об ошибке, я могу предложить самое элегантное решение здесь

if __name__ == '__main__':
    if sys.argv[0][-4:] == '.exe':
        setattr(sys, 'frozen', True)
    multiprocessing.freeze_support()
    YOURMAINROUTINE()

freeze_support()-call необходим в Windows - см. документацию по многопроцессорной обработке Python.
Если вы работаете в python только с этой строкой, это уже нормально.
Но почему-то cython явно не знает о некоторых из этих вещей (в документации сказано, что он протестирован с py2exe, PyInstaller и cx_Freeze). Это может быть смягчено с помощью setattr-вызова, который может использоваться только при компиляции, таким образом, решение по расширению файла.

person Bastian Ebeling    schedule 21.11.2017
comment
Пробовал это, и, к сожалению, я все еще получаю несколько запусков моего замороженного приложения командной строки с использованием multiprocessing.Pool на Python 3.7 с зависимостью joblib. Я проверил, и setattr(sys, 'frozen', True) казнен (впрочем, как бомба). - person Alex; 25.01.2020
comment
Это работает только для окон. См. bugs.python.org/issue32146 - person ead; 16.02.2020

Вдохновленный ответом (или приведенными там идеями) от ead, я нашел очень простое решение - или лучше назовите его обходной путь.
Для меня просто изменение предложения if на

if __name__ == '__main__':
    if len(sys.argv) == 1:
        main()
    else:
        sys.argv[1] = sys.argv[3]
        exec(sys.argv[2])

сделал это.
Причина, по которой это работает (в моем случае): при вызове исходного .py-файла __name__ рабочего устанавливается в __mp_main__ (но все процессы являются просто обычным .py-файлом).
При запуске скомпилированной версии (cython) name воркера нельзя использовать, но воркеры вызываются по-разному, и поэтому мы можем идентифицировать их по более чем одному аргументу в argv. В моем случае argv рабочего читает

['MYPROGRAMM.exe',
 '-c',
 'from multiprocessing.spawn import spawn_main;
       spawn_main(parent_pid=9316, pipe_handle =392)',
 '--multiprocessing-fork']

Таким образом, в argv[2] код активации рабочих процессов находится и выполняется с помощью верхних команд.
Конечно, если вам нужны аргументы для вашего скомпилированного файла, вам потребуется больше усилий, возможно, синтаксический анализ для parent_pid в вызове. Но в моем случае с этим просто перестарались.

person Bastian Ebeling    schedule 20.11.2017
comment
Ваше решение будет работать только для ваших минимальных / похожих примеров. Как только данные должны быть разделены между мастером и подчиненными, ваша работа выйдет из строя ... - person ead; 20.11.2017
comment
@ead ой грустно - ты прав. Я только что протестировал. Ты знаешь почему? Что мне делать дальше? (Ладно, это больше не шумиха, но не работает. :-() - person Bastian Ebeling; 20.11.2017
comment
Что насчет моего решения, которое имитирует правильное поведение интерпретатора Python? Вам нужно будет настроить его для окон (parent_pid вместо fd и pipe_handle вместо pipe), но я думаю, что это должно правильно настроить связь - person ead; 20.11.2017
comment
@ead ваше решение работает - мне просто нужно было понять это. Меня это устраивает, но я думаю, что это можно сделать (работая для меня также с большими рабочими функциями) более читаемым (или понятным) способом - я обновлю свой ответ. - person Bastian Ebeling; 20.11.2017

Поскольку предложенные решения не сработали для меня, я даю дополнительный ответ с обходным путем.

Мое замороженное приложение также привело к бомбе с многопроцессорной обработкой. Я мог решить это с помощью

  1. с использованием параллелизма на основе потоков вместо многопроцессорной обработки и
  2. в рамках параллельного выполнения Joblib с использованием Parallel(n_jobs=4, prefer="threads"), как предлагается этим ответом (вместо значения по умолчанию prefer="multiprocessing")

Мне не удалось заставить multiprocessing.Pool работать в замороженном приложении (ни с prefer="threads", ни с prefer="multiprocessing"), но можно переключиться на многопроцессорность на основе потоков с помощью см. документацию:

# a dependency with joblib
from dep_with_joblib import BigJob
# multiprocessing wrapper for threaded.Thread
from multiprocessing.dummy import Pool as ThreadPool
# instead of
# from multiprocessing import Pool

# thread based parallelism,
# works if `Parallel(n_jobs=4, prefer="threads")` is used
# in joblib (e.g. inside big_job())
POOL = ThreadPool(processes=1)

# as far as I can tell,
# the following Process based Parallelism 
# does _not_ work with frozen app/joblib atm
# POOL = Pool(processes=1)

class MainClass():
    def __init__(self):
        """Init ClusterGen"""
        return

    @staticmethod
    def run_big_job(big_job, data):
        """Run big_job on parallel thread"""
        big_job()
        return big_job

   def big_job_exec(self):
        """Big job execution"""


        bigjob = BigJob()
        big_job_input_data = ...
        # Start big_job on different thread
        async_result = POOL.apply_async(
            MainClass.run_big_job, (bigjob, big_job_input_data))
        # get results from clusterer
        bigjob_results = async_result.get()

Более явный пример с Queue и threading.Thread:

import threading
import queue
# a dependency with joblib
from dep_with_joblib import BigJob

job_queue = queue.Queue()

def store_in_queue(f):
    def wrapper(*args):
        job_queue.put(f(*args))
    return wrapper

class MainClass():
    def __init__(self):
        """Init ClusterGen"""
        return

    @staticmethod
    @store_in_queue
    def run_big_job(big_job, data):
        """Run big_job on parallel thread"""
        big_job()
        return big_job

   def big_job_exec(self):
        """Big job execution"""

        bigjob = BigJob()
        big_job_input_data = ...
        # Start big_job on different thread
        t = threading.Thread(
            target=MainClass.run_big_job,
            args=(bigjob, big_job_input_data),
            group=None,
            name="example-bigjob",
        )
        t.start()
        # get results from big_job
        bigjob_results = job_queue.get()

в обоих приведенных выше примерах bigjob() выполняется асинхронно в другом потоке. Примеры можно легко изменить с помощью нескольких потоков.

Почему асинхронный? В моем случае BigJob() - это модуль из зависимости, который использует Joblib.Parallel для повышения скорости, что не сработало бы, когда мое приложение было заморожено + мне нужно bigjob() для запуска асинхронного режима, чтобы предотвратить сбой моего графического интерфейса.

person Alex    schedule 29.01.2020