Как тайм-аут долго работающей программы с помощью rxpython?

Скажем, у меня есть долго работающая функция Python, которая выглядит примерно так?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x

Я хочу иметь возможность установить тайм-аут 1000ms.

Итак, я делаю что-то вроде создания наблюдаемого и отображения его с помощью вышеупомянутых интенсивных вычислений.

a = Observable.repeat(1).map(lambda x: intns(x))

Теперь для каждого испускаемого значения, если это занимает более 1000 мс, я хочу завершить наблюдаемое, как только я достигну 1000ms, используя on_error или on_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

приведенный выше оператор действительно получает тайм-аут и вызывает on_error, но он продолжает вычислять интенсивные вычисления и только затем возвращается к следующим операторам. Есть ли лучший способ сделать это?

Последний оператор печатает следующее

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

Идея состоит в том, что если время ожидания определенной функции истекло, я хочу иметь возможность продолжить свою программу и игнорировать результат.

Мне было интересно, была ли функция intns выполнена в отдельном потоке, я думаю, что основной поток продолжает выполнение после тайм-аута, но я все еще хочу остановить вычисление функции intns в потоке или как-то убить ее.


person syllogismos    schedule 20.07.2017    source источник


Ответы (5)


Ниже приведен класс, который можно вызвать с помощью with timeout() :.

Если блок под кодом работает дольше указанного времени, поднимается TimeoutError.

import signal

class timeout:
    # Default value is 1 second (1000ms)
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

# example usage
with timeout() :
    # infinite while loop so timeout is reached
    while True :
        pass

Если я понимаю вашу функцию, вот как будет выглядеть ваша реализация:

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    with timeout() :
        time.sleep(y)
    print('end')
    return x
person Sam    schedule 04.12.2017

Вы можете сделать это частично с помощью многопоточности. Хотя в python нет конкретного способа убить поток, вы можете реализовать метод, чтобы пометить поток как завершенный.

Это не сработает, если поток ожидает других ресурсов (в вашем случае вы смоделировали «длинный» работающий код случайным ожиданием)

См. также Есть ли способ убить поток в Python ?

person BA.    schedule 06.09.2017
comment
Пример, показывающий такое поведение, был бы довольно хорош - person Adonis; 07.09.2017
comment
Пожалуйста, смотрите новый ответ - person BA.; 08.09.2017

Вот как это работает:

import random
import time
import threading
import os

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x


thr = threading.Thread(target=intns, args=([10]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > 9):
        os._exit(0)
person Marco Salerno    schedule 07.09.2017
comment
Это работает, но никак не связано с пакетом rx, который является ядром вопроса. - person Adonis; 07.09.2017
comment
Не совсем, он не спрашивал, как это сделать с rx, он просто сказал, что пытался сделать это с rx - person Marco Salerno; 07.09.2017
comment
правда, я не заметил "rx" в "rxpython" в любом случае, я думаю, что он не особо интересовался пакетом rx - person Marco Salerno; 07.09.2017

Вот пример тайм-аута

import random
import time
import threading

_timeout = 0

def intns(loops=1):
    print('begin')
    processing = 0
    for i in range(loops):
        y = random.randint(5,10)
        time.sleep(y)
        if _timeout == 1:
            print('timedout end')
            return
        print('keep processing')
    return

# this will timeout
timeout_seconds = 10
loops = 10

# this will complete
#timeout_seconds = 30.0
#loops = 1

thr = threading.Thread(target=intns, args=([loops]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > timeout_seconds):
        _timeout = 1

thr.join()
if _timeout == 0:
    print ("completed")
else:
    print ("timed-out")
person BA.    schedule 08.09.2017
comment
и как вы предлагаете совместить его с rx? - person ffeast; 11.09.2017
comment
это общий обходной путь. Функция Observable может устанавливать таймер и проверять между задачами, если она должна выйти из строя и вернуть on_error. Это имеет ограничение, указанное выше, в отношении того, где требуется ожидание/затратная по времени обработка. - person BA.; 13.09.2017
comment
Напрямую threading это плохой способ, особенно для начинающих и не системных программистов! Более безопасным является использование типа объекта Timer из Python (from threading import Timer), который в то же время отвечает требованию определять точные миллисекунды вместо секунд - docs.python.org/2.4/lib/timer-objects.html Использование time.sleep или time.clock (которое кто-то использует в другом посте) соответствует к ненужному и сложному решению. - person s3n0; 20.05.2018

Вы можете использовать time.sleep() и сделать цикл while для time.clock()

person Pooa    schedule 18.01.2018