Почему flask_socketio.disconnect препятствует завершению потока?

У меня следующая система: [Клиент] - [Веб-сервер] - [Подключение].

Коннектор — это своего рода промежуточный код между веб-сервером и источником данных.

Мне нужно контролировать соединение сервера с коннектором. Если соединение потеряно, то я должен уведомить клиента.

Связь между веб-сервером и коннектором организована с помощью socketio.

Проблема в том, что если коннектор перестанет работать, то веб-сервер узнает об этом только через минуту (это в лучшем случае).

Я решил, что сервер должен проверять состояние коннектора каждую секунду.

При подключении коннектора на сервере запускается фоновая задача. Суть задания: каждую секунду: 1) зафиксировать время; 2) сохранить фиксированное время в стеке; 3) отправить эхо-сообщение на коннектор. (см. server.background_thread)

Соединитель принимает эхо-сообщение и отметку времени в качестве параметра и отправляет эхо-сообщение на веб-сервер, в качестве параметра он передает полученную отметку времени. (см. client.echo)

Веб-сервер получает эхо-сообщение, если временная метка равна последнему значению в стеке, то это значение удаляется из стека. (см. server.on_echo_connector)

На веб-сервере на каждой итерации проверяется размер стека (см. server.background_thread). Если оно больше 5, то это означает, что коннектор 5 раз не ответил на эхо-сообщение, считаем, что коннектор недоступен, и отключаемся от него.

Когда сервер понимает, что коннектор недоступен, необходимо завершить поток, отправивший эхо-сообщения на коннектор.

Как только размер стека становится больше 5, я выхожу из бесконечного цикла и вызываю flask_socketio.disconnect (connector_sid, '/ connector'). После этого вызова ничего не работает (например, print)

В методе on_disconnect_connector (сервер) вызывается метод thread.join(), который никогда не завершается.

И мне нужно завершить поток, чтобы при повторном запуске коннектора он успешно подключился и все началось сначала.

Как решить эту проблему?

сервер

# -*- coding: utf-8 -*-

import os
import threading
import time
import collections
from datetime import datetime

import flask
import flask_socketio

def get_unix_time():
    return int(time.mktime(datetime.now().timetuple()))

class Stack(collections.deque):

    def __init__(self, iterable=(), maxlen=None):
        collections.deque.__init__(self, iterable, maxlen)

    @property
    def size(self):
        return len(self)

    @property
    def empty(self):
        return self.size == 0

    @property
    def head(self):
        return self[-1]

    @property
    def tail(self):
        return self[0]

    def push(self, x):
        self.append(x)

# SERVER

app = flask.Flask(__name__)
sio = flask_socketio.SocketIO(app, async_mode='gevent')

connector_sid = None
echo_stack = Stack()

thread = None
thread_lock = threading.Lock()


def background_thread(app):
    time.sleep(2)  # delay for normal connection

    while True:
        if echo_stack.size >= 5:
            break
        time_ = get_unix_time()
        echo_stack.push(time_)
        sio.emit('echo', time_, namespace='/connector')
        sio.sleep(1)

    with app.app_context():
        flask_socketio.disconnect(connector_sid, '/connector')


@sio.on('connect', namespace='/connector')
def on_connect_connector():
    """Connector connection event handler."""
    global connector_sid, thread
    print 'Attempt to connect a connector {}...'.format(request.sid)

    # if the connector is already connected, reject the connection
    if connector_sid is not None:
        print 'Connection for connector {} rejected'.format(request.sid)
        return False
        # raise flask_socketio.ConnectionRefusedError('Connector already connected')

    connector_sid = request.sid
    print('Connector {} connected'.format(request.sid))

    with thread_lock:
        if thread is None:
            thread = sio.start_background_task(
                background_thread, current_app._get_current_object())

    # notify clients about connecting a connector
    sio.emit('set_connector_status', True, namespace='/client')


@sio.on('disconnect', namespace='/connector')
def on_disconnect_connector():
    """Connector disconnect event handler."""
    global connector_sid, thread

    print 'start join'
    thread.join()
    print 'end join'
    thread = None
    print 'after disconet:', thread

    connector_sid = None

    echo_stack.clear()

    print('Connector {} disconnect'.format(request.sid))

    # notify clients of disconnected connector
    sio.emit('set_connector_status', False, namespace='/client')


@sio.on('echo', namespace='/connector')
def on_echo_connector(time_):
    if not echo_stack.empty:
        if echo_stack.head == time_:
            echo_stack.pop()


@sio.on('message', namespace='/connector')
def on_message_connector(cnt):
    # print 'Msg: {}'.format(cnt)
    pass

if __name__ == '__main__':
    sio.run(app)

клиент

# -*- coding: utf-8 -*-

import sys
import threading
import time

import socketio
import socketio.exceptions

sio = socketio.Client()
thread = None
thread_lock = threading.Lock()
work = False


def background_thread():
    # example task
    cnt = 0
    while work:
        cnt += 1
        if cnt % 10 == 0:
            sio.emit('message', cnt // 10, namespace='/connector')
        sio.sleep(0.1)


@sio.on('connect', namespace='/connector')
def on_connect():
    """Server connection event handler."""
    global thread, work

    print '\n-----            Connected to server            -----' \
          '\n----- My SID:  {} -----\n'.format(sio.sid)

    work = True  # set flag

    # run test task
    with thread_lock:
        if thread is None:
            thread = sio.start_background_task(background_thread)


@sio.on('disconnect', namespace='/connector')
def on_disconnect():
    """Server disconnect event handler."""
    global thread, work

    # clear the work flag so that at the next iteration the endless loop ends
    work = False
    thread.join()
    thread = None

    # disconnect from server
    sio.disconnect()
    print '\n-----         Disconnected from server          -----\n'

    # switch to the mode of infinite attempts to connect to the server
    main()


@sio.on('echo', namespace='/connector')
def on_echo(time_):
    sio.emit('echo', time_, namespace='/connector')


def main():
    while True:
        try:
            sio.connect('http://localhost:5000/connector',
                        namespaces=['/connector'])
            sio.wait()
        except socketio.exceptions.ConnectionError:
            print 'Trying to connect to the server...'
            time.sleep(1)
        except KeyboardInterrupt:
            print '\n---------- EXIT ---------\n'
            sys.exit()
        except Exception as e:
            print e


if __name__ == '__main__':
    print '\n---------- START CLIENT ----------\n'
    main()

Питон 2.7


person ioprst    schedule 12.02.2020    source источник
comment
Почему для обнаружения отключения требуется одна минута? Это относится только к HTTP-соединениям, но когда соединение осуществляется через WebSocket, немедленно обнаруживается отключение. Вы проверили, подключается ли ваш клиент через WebSocket?   -  person Miguel    schedule 12.02.2020
comment
@ Мигель, нет, ты говоришь о connection_transports для клиента?   -  person ioprst    schedule 12.02.2020
comment
@Мигель, connection_transports == Нет. Я обновил клиентское соединение до sio.connect('http://localhost:5000/connector', namespaces=['/connector'], transports = ['websocket']). Но теперь клиент даже не подключается к серверу. После каждого подключения sio.connected == True, но сервер не видит клиента, и клиент бесконечно подключается в цикле.   -  person ioprst    schedule 12.02.2020
comment
Нет, я спрашивал, поддерживает ли ваш сервер соединения через WebSocket. Чтобы это работало, вам нужно использовать веб-серверы eventlet или gevent, если вы используете веб-сервер Flask, WebSocket не поддерживается.   -  person Miguel    schedule 13.02.2020
comment
@ Мигель, да, у меня установлены пакеты gevent и gevent-websocket. Проблема заключалась в том, что клиент не переключился на транспорт websocket. Я нашел решение этой проблемы. Спасибо.   -  person ioprst    schedule 13.02.2020


Ответы (1)


Необходимо установить дополнительную библиотеку для клиента (см.)

pip install "python-socketio[client]"

Благодаря этой библиотеке работает транспорт WebSocket. Теперь отсоединение разъема видно сразу.

person ioprst    schedule 12.02.2020