Создание очереди для каждого удаленного метода при использовании RabbitMQ?

Давайте на мгновение согласимся с тем, что реализация RPC поверх очередей сообщений (например, RabbitMQ) не является ужасной — иногда это может быть необходимо при взаимодействии с устаревшими системами.

В случае RPC через RabbitMQ клиенты отправляют сообщение брокеру, брокер направляет сообщение рабочему процессу, рабочий возвращает результат клиенту через брокера. Однако, если рабочий процесс реализует более одного удаленного метода, то каким-то образом разные вызовы должны направляться к разным слушателям.

Какова общая практика в этом случае? Во всех примерах RPC поверх MQ показан только один удаленный метод. Было бы хорошо и легко просто установить имя метода в качестве имени правила/очереди маршрутизации, но я не знаю, правильный ли это способ сделать это.


person Mate Varga    schedule 28.07.2015    source источник


Ответы (3)


Давайте на мгновение согласимся с тем, что реализация RPC поверх очередей сообщений (например, RabbitMQ) не является ужасной идеей.

это совсем не страшно! это распространено и рекомендуется во многих ситуациях, а не только при интеграции с устаревшей версией.

... хорошо, теперь к вашему актуальному вопросу :)


с точки зрения очень высокого уровня, вот что вам нужно сделать.

Ваш запрос и ответ должны содержать две ключевые части информации:

  • a correlation-id
  • очередь reply-to

Эти биты информации позволят вам сопоставить исходный запрос и ответ.

Прежде чем отправить запрос

пусть ваш запрашивающий код создаст для себя эксклюзивную очередь. Эта очередь будет использоваться для получения ответов.

создайте новый идентификатор корреляции — обычно это GUID или UUID, чтобы гарантировать уникальность.

При отправке запроса

Прикрепите созданный вами идентификатор корреляции к свойствам сообщения. есть свойство correlationId, которое вы должны использовать для этого.

сохраните идентификатор корреляции со связанной функцией обратного вызова (обработчиком ответов) для запроса где-то внутри кода, выполняющего запрос. вам понадобится это, когда придет ответ.

также прикрепите имя созданной вами монопольной очереди к свойству replyTo сообщения.

со всем этим вы можете отправить сообщение через rabbitmq

при ответе

код ответа должен использовать поля correlationId и replyTo исходного сообщения. так что обязательно захватите их

ответ должен быть отправлен непосредственно в очередь replyTo. не используйте стандартную публикацию через биржу. вместо этого отправьте ответное сообщение непосредственно в очередь, используя функцию «отправить в очередь» любой библиотеки, которую вы используете, и отправьте ответ непосредственно в очередь replyTo.

не забудьте также включить correlationId в ответ. это важная часть, чтобы ответить на ваш вопрос

при обработке ответа

Код, выполнивший первоначальный запрос, получит сообщение из очереди replyTo. затем он вытащит correlationId из свойств сообщения.

используйте идентификатор корреляции, чтобы найти метод обратного вызова для запроса... код, который обрабатывает ответ. передайте сообщение этому методу обратного вызова, и все готово.

детали реализации

это работает с точки зрения высокого уровня. когда вы перейдете к коду, детали реализации будут различаться в зависимости от используемого языка и драйвера/библиотеки.

большинство хороших библиотек RabbitMQ для любого данного языка будут иметь встроенный запрос/ответ. Если у вас нет, вы можете поискать другую библиотеку. Если вы не пишете библиотеку на основе шаблонов поверх протокола AMQP, вам следует искать библиотеку, в которой реализованы общие шаблоны.

Если вам нужна дополнительная информация о шаблоне запроса/ответа, включая все детали, которые я предоставил здесь (и многое другое), ознакомьтесь со следующими ресурсами:

Если вы работаете в Node.js, я рекомендую использовать библиотеку wascally, которая включает в себя Request Функция /Reply, которая вам нужна. Для Ruby проверьте bunny. Для Java или .NET посмотрите на некоторые из многочисленных реализаций служебной шины. В .NET я рекомендую NServiceBus или MassTransit.

person Derick Bailey    schedule 29.07.2015
comment
Привет, спасибо за ответ. Я действительно понял, что последние реализации AMQP/RabbitMQ могут напрямую отправлять ответ отправителю без ручной реализации и создания очередей ответов. - person Mate Varga; 30.07.2015
comment
где ты это прочитал? я не обращал внимания на последние релизы и не знал об этом - person Derick Bailey; 30.07.2015
comment
я нашел это: rabbitmq.com/direct-reply-to.html - я должен буду изучить это больше. кажется интересным - person Derick Bailey; 31.07.2015
comment
Да, это именно так. - person Mate Varga; 31.07.2015

Я обнаружил, что использование новой очереди ответа для каждого запроса может оказаться очень неэффективным, особенно при запуске RabbitMQ в кластере.

Как было предложено в комментариях, прямой ответ-на кажется подходящим способом. Я задокументировал здесь все варианты, которые я пробовал, прежде чем остановиться на этом.

person Facundo Olano    schedule 26.06.2016

Я написал пакет npm amq.rabbitmq.reply-to.js что:

Применение:

const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');

const serverCallbackTimesTen = (message, rpcServer) => {
    const n = parseInt(message);
    return Promise.resolve(`${n * 10}`);
};

let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
    const serverOptions = new rabbitmqreplyto.RpcServerOptions(
    /* url */ undefined, 
    /* serverId */ undefined, 
    /* callback */ serverCallbackTimesTen);

    return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
    rpcServer = rpcServerP;
    return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
    rpcClient = rpcClientP;
    const promises = [];
    for (let i = 1; i <= 20; i++) {
        promises.push(rpcClient.sendRPCMessage(`${i}`));
    }
    return Promise.all(promises);
}).then((replies) => {
    console.log(replies);
    return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});

//['10',
//  '20',
//  '30',
//  '40',
//  '50',
//  '60',
//  '70',
//  '80',
//  '90',
//  '100',
//  '110',
//  '120',
//  '130',
//  '140',
//  '150',
//  '160',
//  '170',
//  '180',
//  '190',
//  '200']
person Matthias    schedule 27.07.2018