Как написать неблокирующий обработчик запросов с фрагментами в Tornado

Вот два простых RequestHandlers:

class AsyncHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        while True:
            future = Future()
            global_futures.add(future)
            s = yield future
            self.write(s)
            self.flush()


class AsyncHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for f in global_futures:
            f.set_result(str(dt.now()))
        global_futures.clear()
        self.write("OK")

Первый подписывается на поток, второй доставляет сообщение всем подписчикам.

Проблема в том, что у меня не может быть больше группы (в моем случае 5-6) подписчиков. Как только я подписываюсь больше разрешенного, следующий запрос ко второму методу просто зависает.

Я предполагаю, что это происходит из-за того, что первый обработчик не является должным образом асинхронным. Это потому, что я использую глобальный объект для хранения списка подписчиков?

Как я могу открыть больше потоковых запросов одновременно и каков логический предел?


person GSazheniuk    schedule 18.08.2020    source источник
comment
Проблема не была связана с темой вопроса, здесь обсуждается исходная проблема - stackoverflow.com/questions/985431/   -  person GSazheniuk    schedule 02.09.2020


Ответы (1)


Проблема в том, что global_futures изменяется, пока вы повторяете его: когда AsyncHandler.get просыпается, он переходит от одного yield к другому, что означает, что он создает свое следующее будущее и добавляет его в набор до того, как управление будет возвращено AsyncHandler2. Это не определено, и поведение зависит от того, где находится итератор в наборе: иногда новое будущее вставляется за итератором, и все в порядке, иногда оно вставляется перед итератором, и тот же обработчик потребителя будет активирован во второй раз (и вставьте третью копию самого себя, которая может быть спереди или сзади ...). Когда у вас всего несколько потребителей, вы будете попадать в заднюю часть достаточно часто, чтобы все заработало, но при слишком большом количестве потребителей становится крайне маловероятно, что когда-нибудь закончится.

Решение состоит в том, чтобы скопировать global_futures перед повторением по нему, а не очищать его в конце:

@gen.coroutine
def get(self);
    fs = list(global_futures)
    global_futures.clear()
    for f in fs:
        f.set_result(str(dt.now()))
    self.write("OK")

Обратите внимание: я думаю, что это проблема только в Tornado 4.x и старше. В Tornado 5 все было изменено, так что set_result больше не вызывает немедленно ожидающий обработчик, поэтому одновременное изменение больше не выполняется.

person Ben Darnell    schedule 23.08.2020
comment
Спасибо, Бен, я ценю твою помощь. Проблема оказалась чем-то совершенно не связанным с торнадо, на самом деле это ограничение для одновременных подключений на хост в браузере - stackoverflow.com/questions/985431/ - person GSazheniuk; 02.09.2020