Остановить длительное действие в web2py с многопроцессорностью

У меня есть приложение web2py, которое в основном служит интерфейсом браузера для скрипта Python. Этот сценарий обычно возвращается довольно быстро, но иногда может занять много времени. Я хочу предоставить пользователю возможность остановить выполнение скрипта, если это займет слишком много времени.

В настоящее время я вызываю функцию следующим образом:

def myView():  # this function is called from ajax
    session.model = myFunc()  # myFunc is from a module which i have complete control over
    return dict(model=session.model)

myFunc при вызове с определенными параметрами использует многопроцессорность, но все равно занимает много времени. Мне нужен какой-то способ завершить функцию или, по крайней мере, дочерние потоки.

Первое, что я попробовал, это запустить myFunc в новом процессе и свернуть мою собственную простую систему событий, чтобы убить его:

# in the controller
def myView():
    p_conn, c_conn = multiprocessing.Pipe()
    events = multiprocessing.Manager().dict()
    proc = multiprocessing.Process(target=_fit, args=(options, events c_conn))
    proc.start()
    sleep(0.01)
    session.events = events
    proc.join()
    session.model = p_conn.recv()
    return dict(model=session.model)

def _fit(options, events pipe):
    pipe.send(fitting.logistic_fit(options=options, events=events))
    pipe.close()

def stop():
    try:
        session.events['kill']()
    except SystemExit:
        pass  # because it raises that error intentionally
    return dict()

# in the module
def kill():
    print multiprocessing.active_children()
    for p in multiprocessing.active_children():
        p.terminate()
    raise SystemExit

def myFunc(options, events):
    events['kill'] = kill

Я столкнулся с несколькими серьезными проблемами с этим.

  1. Сеанс в stop() не всегда был таким же, как сеанс в myView(), поэтому session.events было None.
  2. Даже когда сеанс был таким же, kill() не убивал детей должным образом.
  3. Длительно работающая функция зависала бы в потоке web2py, поэтому stop() даже не обрабатывалась до тех пор, пока функция не завершится.

Я решил не вызывать join() и использовать AJAX для получения результата функции позже, но не смог сохранить объект процесса в session для последующего использования. Казалось, что канал можно замариновать, но потом у меня возникла проблема с невозможностью доступа к тому же сеансу из другого представления.

Как я могу реализовать эту функцию?


person Scimonster    schedule 18.05.2016    source источник


Ответы (1)


Для длительных задач лучше ставить их в очередь с помощью встроенного планировщик. Если вы хотите разрешить пользователю вручную останавливать задачу, которая выполняется слишком долго, вы можете использовать метод scheduler.stop_task(ref) (где ref — задача id или uuid). В качестве альтернативы, когда вы ставите задачу в очередь, вы можете указать время ожидания, чтобы оно автоматически останавливалось, если оно не было завершено в течение периода времени ожидания.

Вы можете выполнить простой опрос Ajax, чтобы уведомить клиента о завершении задачи (или реализовать что-то более сложное с помощью веб-сокетов или SSE).

person Anthony    schedule 18.05.2016
comment
Не могли бы вы расширить свой ответ? Некоторые конкретные вещи, на которые стоило бы ответить: Запускает ли планировщик задачу сразу, из скрипта? Как получить результат задачи? Как принудительно остановить задачу? Как бы то ни было, это не полностью отвечает на вопрос. - person Scimonster; 18.05.2016
comment
Я обновил ответ, чтобы ответить на ваш последний вопрос. Ваши первые два вопроса рассматриваются в связанной документации. Поскольку они включают в себя общие функции планировщика и не являются частью вашего исходного вопроса, я не буду обновлять ответ, но кратко упомяну, что по умолчанию новые задачи немедленно ставятся в очередь (и вы можете заставить работника немедленно проверить очередь, установив immediate=True), а метод scheduler.task_status() используется для получения результатов. - person Anthony; 18.05.2016