Pyzmq - отправить сообщение в сокет STREAM

Я пытаюсь реализовать простой пример соединения между двумя сокетами STREAM в pyzmq.

sender.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")

socket.connect("tcp://localhost:5556")
socket.send("message")

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5556")

message = socket.recv()
print("Received -> [ %s ]" % (message))

Вывод

Received [ b'\x00k\x8bEg' ]
Received [ b'' ]

Я хотел бы спросить, как правильно отправлять сообщения между сокетами STREAM.


person M.Puk    schedule 08.11.2017    source источник
comment
Здравим-ду-Блави. Помощь в навигации? StackOverflow является проверяющим, а также добрыми отзывами и добрыми ответами, заслужившими обратную связь, с закрытым и неподтвержденным пакетом [Accept]. Так напиш а закликей :о)   -  person user3666197    schedule 15.11.2017
comment
Dakujem za odpoved. Nemal som este bohuzial cas sa k nej vyjadrit ale zodpovedala moju otazku. Skoda ze nie aj moju dilemu :-D   -  person M.Puk    schedule 16.11.2017


Ответы (2)


Ваши данные, обработанные socket.recv(), в точности соответствуют спецификации ZeroMQ, хотя они и не должны вас радовать, и вы сомневаетесь, почему вы получили именно это, а не красиво доставленные точные копии отправленных сообщений.

Итак, наберитесь терпения и продолжайте читать.

Недавно добавленный ZeroMQ архетип сокета STREAM довольно специфичен

Любой, у кого есть несколько лет опыта работы с инструментами сигнализации/обмена сообщениями ZeroMQ, скажет вам, что недавно (v4.x) добавленный архетип STREAM — не лучший выбор для взаимосвязи между процессом ZeroMQ и процессом ZeroMQ.

Почему? Почти все драгоценные камни, которые есть в инструментах ZeroMQ, должны быть сокращены в STREAM, чтобы точка доступа к сокету ZeroMQ могла «говорить» с противоположным процессом конечной точки сокета, который ничего не знает о смарт-сокетах ZeroMQ выше. протоколы уровня.

Собственный шаблон

Собственный шаблон используется для связи с одноранговыми узлами TCP и допускает асинхронные запросы и ответы в любом направлении. ZMQ_STREAM

Сокет типа ZMQ_STREAM используется для отправки и получения данных TCP от узла, отличного от ØMQ, при использовании транспорта tcp://. Сокет ZMQ_STREAM может действовать как клиент и/или сервер, отправляя и/или получая данные TCP асинхронно.

При получении данных TCP сокет ZMQ_STREAM должен добавлять к сообщению часть сообщения, содержащую идентификатор отправителя, перед его передачей приложению. Полученные сообщения помещаются в очередь от всех подключенных одноранговых узлов.

При отправке данных TCP сокет ZMQ_STREAM должен удалить первую часть сообщения и использовать ее для определения идентификатора партнера, которому должно быть направлено сообщение, а немаршрутизируемые сообщения должны вызывать ошибку EHOSTUNREACH или EAGAIN.

Чтобы открыть соединение с сервером, используйте вызов zmq_connect(), а затем получите идентификатор сокета с помощью вызова ZMQ_IDENTITY zmq_getsockopt().

Чтобы закрыть конкретное соединение, отправьте кадр идентификации, за которым следует сообщение нулевой длины (см. раздел ПРИМЕР).

Когда соединение будет установлено, приложение получит сообщение нулевой длины. Точно так же, когда одноранговый узел отключается (или соединение теряется), приложение получает сообщение нулевой длины.

Вы должны отправить один кадр идентификации, за которым следует один кадр данных. Флаг ZMQ_SNDMORE требуется для кадров идентификации, но игнорируется для кадров данных.

ПРИМЕР

void    *ctx = zmq_ctx_new ();
assert ( ctx );
/*                                             Create ZMQ_STREAM socket */
void    *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );

int      rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );

/*                                            Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t  id_size = 256;

/*                                            Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t  raw_size = 256;

while ( 1 ) {
   /*                                         Get HTTP request; ID frame and then request */
   id_size  = zmq_recv ( socket, id, 256, 0 );
   assert ( id_size >  0 );
   do {
        raw_size  = zmq_recv ( socket, raw, 256, 0 );
        assert ( raw_size >= 0 );
   } while (     raw_size == 256 );
   /*                                         Prepares the response */
   char http_response [] =
                            "HTTP/1.0 200 OK\r\n"
                            "Content-Type: text/plain\r\n"
                            "\r\n"
                            "Hello, World!";
   /*                                         Sends the ID frame followed by the response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, http_response, strlen ( http_response ), 0 );

   /*                                         Closes the connection by sending the ID frame followed by a zero response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );

Если вы будете следовать описанию поведения STREAM в случаях с несколькими сокетами, отправляющая сторона получит циклические чтения с справедливой очередью для экземпляра socket, который подключен ( 1x через .connect() + Nx через .bind(), N = < 0, +INF ) ) к нескольким конечным точкам, до сих пор с нулевым контролем над количеством и/или характером взаимодействующих одноранговых узлов, но с механизмом циклического перебора с справедливой очередью на socket.recv()-s. Определенно небезопасная практика проектирования.

Summary of ZMQ_STREAM characteristics
Compatible peer sockets     none
Direction                   Bidirectional
Send/receive pattern        Unrestricted
Outgoing routing strategy   See text ( above )
Incoming routing strategy   Fair-queued
Action in mute state        EAGAIN
person user3666197    schedule 12.11.2017

Вот упрощенный пример использования pyzmq с односторонним подключением, как в вопросе.

sender.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.connect("tcp://localhost:5555")
id = socket.getsockopt(zmq.IDENTITY)
socket.send(id, zmq.SNDMORE)
socket.send(b"message")

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.bind("tcp://*:5555")
id = socket.recv()
socket.recv()    # empty data here
id = socket.recv()
message = socket.recv()
print("received:" + str(message))
person Alex    schedule 28.08.2019