Шаблон запрос-ответ с использованием библиотеки Spring amqp

все. У меня есть HTTP API для отправки сообщений в брокере RabbitMQ, и мне нужно реализовать шаблон запрос-ответ, чтобы получать ответы от сервера. Так что я что-то вроде моста между клиентами и сервером. Я отправляю сообщения брокеру с определенным ключом маршрутизации, и для этих сообщений есть Потребитель, который публикует обратные сообщения в качестве ответа, и мой API должен потреблять ответ для каждого запроса. Итак, диаграмма выглядит примерно так:

Шаблон запроса-ответа

Итак, что я делаю, так это следующее: для каждого сеанса HTTP я создаю временную responseQueue (которая привязана к обмену по умолчанию, с ключом маршрутизации - именем этой очереди), после этого я устанавливаю заголовок replyTo сообщения как имя очереди ответов (где я буду ждать ответа), а также установите для этой очереди шаблон replyQueue. Вот мой код:

public void sendMessage(AbstractEvent objectToSend, final String routingKey) {
    final Queue responseQueue = rabbitAdmin.declareQueue();
    byte[] messageAsBytes = null;
    try {
        messageAsBytes = new ObjectMapper().writeValueAsBytes(objectToSend);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    MessageProperties properties = new MessageProperties();
    properties.setHeader("ContentType", MessageBodyFormat.JSON);
    properties.setReplyTo(responseQueue.getName());
    requestTemplate.setReplyQueue(responseQueue);

    Message message = new Message(messageAsBytes, properties);
    Message receivedMessage = (Message)requestTemplate.convertSendAndReceive(routingKey, message);
}

Итак, в чем проблема: сообщение отправлено, после этого оно потребляется Потребителем, и его ответ правильно отправляется в нужную очередь, но по какой-то причине он не возвращается в методе convertSendAndReceived и после установленного тайм-аута мое полученное сообщение нулевой. Итак, я попытался сделать несколько вещей - я начал проверять весенний код (кстати, это настоящий кошмар) и увидел, что я не объявляю очередь ответов, которую он создает для меня, и заголовок replyTo установлено имя очереди (то же самое, что и я). Результат был таким же - полученное сообщение все еще пустое. После этого я решил использовать другой шаблон, который использует обмен по умолчанию, потому что responseQueue привязан к этому обмену:

requestTemplate.send(routingKey, message);
Message receivedMessage = receivingTemplate.receive(responseQueue.getName());

Результат был таким же - responseMessage по-прежнему равен нулю. Версии amqp и rabbit - 1.2.1 и 1.2.0 соответственно. Так что я уверен, что что-то упускаю, но я не знаю, что это такое, поэтому, если кто-то может мне помочь, я был бы чрезвычайно благодарен.


person Hristo Angelov    schedule 16.07.2014    source источник


Ответы (1)


1> Странно, что RabbitTemplate использует doSendAndReceiveWithFixed, если вы предоставляете requestTemplate.setReplyQueue(responseQueue). Похоже, ваше объяснение неверно.

2> Чтобы он работал с фиксированным ReplyQueue, вы должны настроить reply ListenerContainer:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueues(responseQueue);
container.setMessageListener(requestTemplate);

3> Но самая важная часть здесь около correlation. RabbitTemplate.sendAndReceive заполняет свойство сообщения correlationId, но потребительская сторона тоже должна иметь дело с этим: недостаточно просто отправить ответ на responseQueue, ответное сообщение должно иметь то же свойство correlationId. См. Здесь: как отправить ответ от потребителя к производителю на конкретный запрос с помощью Spring AMQP?

Кстати, нет причин заполнять Message вручную: вы можете просто поддержать Jackson2JsonMessageConverter в RabbitTemplate, и он автоматически преобразует ваши objectToSend в байты JSON с соответствующими заголовками.

person Artem Bilan    schedule 16.07.2014
comment
Да, важной частью, которую мне не хватало, был параметр correlationId, как вы упомянули. Я забыл установить его, когда отправлял ответ. Большое спасибо! - person Hristo Angelov; 17.07.2014