Тело ответа пирамидного потока

Я пытаюсь транслировать события, отправленные сервером, из своего приложения Pyramid, но не могу понять, как транслировать тело ответа из моего представления. Вот тестовый вид, который я использую (он полностью не реализует SSE, он просто работает с потоковой частью):

@view_config(route_name='iter_test')
def iter_test(request):
    import time
    def test_iter():
        i = 0
        while True:
            i += 1
            if i == 5:
                raise StopIteration
            yield str(time.time())
            print time.time()
            time.sleep(1)

    return test_iter()

Это производит ValueError: Could not convert return value of the view callable function pdiff.views.iter_test into a response object. The value returned was <generator object test_iter at 0x3dc19b0>.

Вместо этого я попробовал return Response(app_iter=test_iter()), который, по крайней мере, не выдает ошибку, но не передает ответ - он ждет, пока генератор не завершит работу, прежде чем вернуть ответ моему браузеру.

Я понимаю, что можно было бы просто вернуть одно событие на запрос и позволить клиентам повторно подключаться после каждого события, но я бы предпочел сохранить природу событий, отправленных сервером в реальном времени, путем потоковой передачи нескольких событий из одного запроса без задержки повторного подключения. Как я могу сделать это с помощью пирамиды?


person spiffytech    schedule 01.02.2014    source источник


Ответы (3)


Я нашел проблему. Оказывается, мой код приложения в порядке, и проблема заключается в официантке и nginx:

  1. Официантка, веб-сервер, используемый Pyramid по умолчанию, буферизует все выходные данные фрагментами по 18 000 байт (см. эту проблему для подробностей).

  2. Источник проблемы был скрыт от меня nginx, веб-сервером, который я поставил перед своим приложением Pyramid, который также буферизует ответы.

(1) может быть решена либо:

  • Настройка официантки с помощью send_bytes = 1 в вашем .ini-файле. Это устраняет проблему потоковой передачи, но делает все ваше приложение очень медленным. Как уже упоминалось @Zitrax, вы можете восстановить некоторую скорость с более высокими значениями, но любое значение выше 1 может привести к тому, что сообщения застрянут в буфере.

  • Переходим на ганикорн. Я не знаю, использует ли gunicorn меньший буфер или он ведет себя лучше с app_iter, но это сработало и ускорило мое приложение.

(2) можно решить, настроив nginx на отключение буферизации для ваших потоковых маршрутов.

Вам нужно установить proxy_buffering off в вашей конфигурации nginx. Этот параметр применяется к сайтам, размещенным через proxy_pass. Если вы не используете proxy_pass, вам может понадобиться другая настройка.

  • Вы можете настроить nginx для динамического включения/отключения буферизации для каждого ответа на основе заголовков запроса, как показано в этом вопросе по теме (хорошее решение для EventSource/Server-Sent Events)

  • В качестве альтернативы вы можете настроить это в блоке location в вашей конфигурации nginx. Это хорошо, если вы используете что-то помимо EventSource и не ожидаете получить конкретный заголовок, или если вы используете EventSource, но хотите отладить ответ с простой вкладки браузера, куда вы не можете отправить Accept заголовок в вашем запросе.

person spiffytech    schedule 04.02.2014
comment
У меня была та же проблема, и действительно установка send_bytes на 1 сработала. Однако все стало ужасно медленным (у меня страницы загружались менее чем за 0,5 секунды, а затем это заняло › 10 секунд). И сейчас я не могу переключиться на gunicorn, так как это только Unix. Компромисс состоит в том, чтобы установить send_bytes на что-то > 1 и ‹ 18000. В моем случае 100 работало очень хорошо. - person Zitrax; 05.10.2015

Некоторое время назад я сделал несколько тестов, чтобы попробовать Event Source/Server Sent Events. Я только что проверил, и он все еще отлично работает с Pyramid 1.5a.

@view_config(route_name = 'events')
def events(request):
    headers = [('Content-Type', 'text/event-stream'),
               ('Cache-Control', 'no-cache')]
    response = Response(headerlist=headers)
    response.app_iter = message_generator()
    return response

def message_generator():
    socket2 = context.socket(zmq.SUB)
    socket2.connect(SOCK)
    socket2.setsockopt(zmq.SUBSCRIBE, '')
    while True:
        msg = socket2.recv()
        yield "data: %s\n\n" % json.dumps({'message': msg})

Полный пример здесь: https://github.com/antoineleclair/zmq-sse-chat . Взгляните на https://github.com/antoineleclair/zmq-sse-chat/blob/master/sse/views.py.

Я точно не знаю, почему у меня работает, а у вас нет. Возможно, это заголовки. Или два '\n' после каждого сообщения. Кстати, если вы правильно посмотрите на спецификацию источника событий, вы должны ставить перед каждым новым событием префикс data: и использовать \n\n в качестве разделителя событий.

person Antoine Leclair    schedule 02.02.2014
comment
Спасибо! Знание того, что мой подход определенно должен работать, указало мне на другие потенциальные проблемы, которые привели меня к решению. - person spiffytech; 04.02.2014
comment
Это больше не работает. Он не должен возвращать байты, поэтому используйте yield "data: %s\n\n" % json.dumps({'message': msg}).encode() - person Pierre Thibault; 21.02.2020

Если вы не укажете какой-либо рендерер для своего представления, вы должны вернуть объект Response. Объект Pyramid Response имеет специальный аргумент app_iter для возврата итераторов. Итак, вы должны сделать это следующим образом:

import time
from pyramid.response import Response


@view_config(route_name='iter_test')
def iter_test(request):

    def test_iter():
        for _ in range(5):
            yield str(time.time())
            print time.time()
            time.sleep(1)

    return Response(app_iter=test_iter())

Я также немного отредактировал ваш код, чтобы сделать его более читаемым.

ОБНОВЛЕНИЕ

Вместо этого я пробовал return Response(app_iter=test_iter()), который, по крайней мере, не выдает ошибку, но не передает ответ - он ждет, пока генератор завершит работу, прежде чем вернуть ответ моему браузеру.

Думаю проблема в буферизации. Попробуйте отправить действительно большой итератор.

person Dmitry Vakhrushev    schedule 02.02.2014
comment
В моих тестах app_iter не передается в браузер. Скорее, webob запускает итератор до завершения, прежде чем возвращать результаты (соответствующий код webob находится здесь: github). Чтобы потоки EventSource работали правильно, мой итератор должен быть бесконечным (т. е. не доводиться до конца), а результаты должны передаваться в браузер по мере их создания, а не одним большим куском. - person spiffytech; 02.02.2014
comment
@spiffytech Код, который вы связали, - это геттер тела, который действительно сворачивает app_iter в одну строку. Но WebOb этого не делает: см. код вызвать метод. Я не проверял источники Pyramid, но я думаю, что это тоже не так. Поэтому, если вы вернете объект Response с app_iter и не получите доступ к телу в своем коде, WebOb вернет app_iter как есть. - person Dmitry Vakhrushev; 02.02.2014
comment
Буферизация не выглядит проблемой — я попробовал гораздо больший генератор (50 тысяч элементов), и Pyramid все еще ждал, пока он не будет завершен, чтобы отправить результаты. Несмотря на это, мне не нужно ждать — смысл потоков Server-Set Event заключается в том, что события поступают в браузер немедленно, поэтому я не могу ждать, пока буфер заполнится. Кроме того, раздел кода webob, на который я ссылался, на самом деле вызывается - я получил этот номер строки, установив точку останова внутри моего итератора и просмотрев стек вызовов во время работы итератора, используя приведенный выше код - нет доступа к телу и соответствующий Объект ответа. - person spiffytech; 03.02.2014