Не удалось перехватить исключение KeyboardInterrupt после запуска dask.distributed Client/LocalClient

Я пытаюсь использовать Ctrl + C, чтобы изящно остановить мой работающий код, включая локальный клиент dask.distrubted. Код ниже является примером моей настройки. Когда я использую Ctrl + C, метод stop() вызывается правильно, однако клиент dask, похоже, неправильно выходит/распечатывает трассировку, даже не достигнув метода self.dask.close().

from dask.distributed import Client

class SomeService(object):
    def __init__(self):
        self.dask = None

    def run(self):
        # Setup local dask client
        self.dask = Client()

        while True:
            # Do something blocking

    def stop(self):
        # Close dask client
        print('Closing Dask Client...')
        self.dask.close()
        print('Dask Cient closed.')
        # Stop other stuff
        # ...
        print('SomeService has been stopped.')


if __name__ == '__main__':
    service = SomeService()
    try:
        service.run()
    except KeyboardInterrupt:
        print('Keyboard interrupt received.')
        service.stop()

Это вывод, который я получаю:

^CTraceback (most recent call last):
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/forkserver.py", line 170, in main
rfds = [key.fileobj for (key, events) in selector.select()]
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/selectors.py", line 560, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt
Keyboard interrupt received.
Closing Dask Client...
Dask Client closed.
SomeService has been stopped.

Я изо всех сил старался переполнить эту проблему google/stack, и единственное решение, которое я нашел, - это предложения использовать пакет сигналов для принудительного обратного вызова SIGINT. Однако это решение не работает, если класс SomeService должен выполняться в отдельном потоке, поскольку вы можете принудительно вызывать обратный вызов сигналов только для сигналов в основном потоке.

Мы ценим любые предложения. Разве это не правильный способ удерживать Dask Client/LocalClient и управлять им?

Дополнительная информация:

Python 3.5.1
dask==0.19.2
distributed==1.23.2
tornado==5.0.2

person user7458    schedule 11.10.2018    source источник


Ответы (1)


Используйте обработчик сигнала, который устанавливает флаг. Когда он доставлен, он вызывает исключение IOError (с элементом .errno, установленным на errno.EINTR):

import signal

done = False
def done_handler(signum, frame):
    done = True

if __name__ == '__main__':
    signal.signal(signal.SIGINT, done_handler)

    service = SomeService()

    while not done:
        try:
            service.run()
        except IOError:
            break

    service.stop()

В Python сигналы доставляются, а обработчики всегда выполняются в основном потоке, поэтому исключение IOError, если его не поймать, должно распространяться до service.run(). (В вашем примере это делает KeyboardInterrupt.) Когда это исключение перехватывается, служба останавливается.

person Nominal Animal    schedule 29.10.2018