ZeroMQ PUSH/PULL и потерянное сообщение

Я использую ZeroMQ из .NET и застрял, пытаясь исправить странную проблему. У меня есть сокет типа PUSH и один типа PULL через TCP. Когда клиент отключается, сервер по-прежнему может отправить сообщение (обратите внимание, что никакие флаги не передаются методу Socket.Send), которое получает очень много, прежде чем начать блокировку и ждать повторного подключения клиента и доставки сообщений, которые я пытаюсь отправить потом.

Как я могу избежать потери сообщения (или, в худшем случае, проверить, подключен ли клиент, и если нет, отправить фиктивное сообщение, которое я могу позволить себе потерять)?

Заранее спасибо!

Редактировать: дальнейшее тестирование показывает, что если я подожду 1 секунду после отправки первого сообщения после отключения клиентом, второе заблокируется, но если я вообще не буду ждать, я смогу отправить столько же сообщений как я хочу, и они все потеряются. Это довольно запутанно...


person em70    schedule 23.02.2011    source источник


Ответы (1)


В документации ZeroMQ отмечается, что это проблема с настройками PUSH/PULL, и предлагается следующий шаблон: добавление настройки REP/REQ для обеспечения координации узлов, когда вы ожидаете фиксированное количество подписчиков. Однако, если вы не можете заранее узнать количество подписчиков, вам следует подумать об изменении протокола, чтобы он был более устойчивым к этим условиям.

Синхронизированный издатель на C (из ZGuide)

//
//  Synchronized publisher
//
#include "zhelpers.h"

//  We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED  10

int main (void) 
{
    s_version_assert (2, 1);
    void *context = zmq_init (1);

    //  Socket to talk to clients
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5561");

    //  Socket to receive signals
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");

    //  Get synchronization from subscribers
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - wait for synchronization request
        char *string = s_recv (syncservice);
        free (string);
        //  - send synchronization reply
        s_send (syncservice, "");
        subscribers++;
    }
    //  Now broadcast exactly 1M updates followed by END
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");

    s_send (publisher, "END");

    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_term (context);
    return 0;
}
person user7116    schedule 04.04.2011
comment
При использовании шаблона PUSH/PULL есть ли способ запретить ZMQ отправлять и ставить в очередь сообщения, если клиент PULL не слушает? При PUSHing я хотел бы отбросить эти сообщения, и я не против потерять их, если нет PULL-клиентов... Есть ли способ сообщить об этом ZMQ, возможно, дополнительный флаг для использования в сочетании с ZMQ_PUSH на сторона толкателя-экземпляра? - person tonix; 27.09.2020
comment
Или, может быть, есть другой шаблон обмена сообщениями ZMQ, который эффективно достигает того, что мне нужно? Я в основном хочу, чтобы отправляемые сообщения были одноразовыми: они либо доставляются, если есть доступный PULL-клиент, либо нет, если в этот конкретный момент времени не подключены PULL-клиенты... Спасибо! Прямо сейчас, если я нажимаю, а клиент недоступен, сообщение ставится в очередь и доставляется, как только клиент PULL повторно подключается, но я хотел бы отказаться от этих сообщений в таких случаях... - person tonix; 27.09.2020