Чтение из нескольких очередей, RabbitMQ

Я новичок в RabbitMQ. Я хочу иметь возможность обрабатывать сообщения чтения без блокировки, когда есть несколько очередей (для чтения). Любые данные о том, как я могу это сделать?

//Редактировать 1

public class Rabbit : IMessageBus
{   

    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    Subscription sub = null;

    public void writeMessage( Measurement m1 ) {
        byte[] body = Measurement.AltSerialize( m1 );
        int msgCount = 1;
        Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);

        string finalQueue = publishToQueue( m1.id );

        while (msgCount --> 0) {
            channel.BasicPublish("amq.direct", finalQueue, null, body);
        }

        Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
    }

     public string publishToQueue(string firstQueueName) {
        Console.WriteLine("Creating a queue and binding it to amq.direct");
        string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
        channel.QueueBind(queueName, "amq.direct", queueName, null);
        Console.WriteLine("Done.  Created queue {0} and bound it to amq.direct.\n", queueName);
        return queueName;
    }


    public Measurement readMessage() {
        Console.WriteLine("Receiving message...");
        Measurement m = new Measurement();

        int i = 0;
        foreach (BasicDeliverEventArgs ev in sub) {
            m = Measurement.AltDeSerialize(ev.Body);
            //m.id = //get the id here, from sub
            if (++i == 1)
                break;
            sub.Ack();
        }

        Console.WriteLine("Done.\n");
        return m;
    }


    public void subscribeToQueue(string queueName ) 
    {
        sub = new Subscription(channel, queueName);
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public Rabbit(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        //consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    ~Rabbit()
    {
        //observer??
        connection.Dispose();
        //channel.Dispose();
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//Редактировать 2

private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            foreach (BasicDeliverEventArgs ev in element) {
                //ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    return m;
                }
                m =  null;  
            }           
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub);     
    }

//Редактировать 3

//MessageHandler.cs

public class MessageHandler
{   
    // Implementation of methods for Rabbit class go here
    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    QueueingBasicConsumer consumer = null;  

    private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

    public void writeMessage ( Measurement m1 )
    {
        byte[] body = Measurement.AltSerialize( m1 );
        //declare a queue if it doesn't exist
        publishToQueue(m1.id);

        channel.BasicPublish("amq.direct", m1.id, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", m1.id);
    }

    public void publishToQueue(string queueName)
    {   
        string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
        channel.QueueBind(finalQueueName, "amq.direct", "", null);
    }

    public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            if( element.QueueName == null)
            {
                m = null;
            }
            else 
            {
                BasicDeliverEventArgs ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    m.id = element.QueueName;
                    element.Ack();
                    return m;
                }
                m =  null;                      
            }
            element.Ack();
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub); 
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public MessageHandler(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    public void disposeAll()
    {
        connection.Dispose();
        channel.Dispose();
        foreach(Subscription element in subscriptions)
        {
            element.Close();
        }
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//App1.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;


public class MainClass
{
    public static void Main()
    {

    MessageHandler obj1 = MessageHandler("Rabbit");

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create new Measurement messages
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33);

    System.Console.WriteLine("Test message 1:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    System.Console.WriteLine("Test message 2:\n    ID: {0}", m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);   

    // Ask queue name and store it
    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName );

    // Write message to the queue
    obj1.writeMessage( m1 );    

    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName2 = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName2 );

    obj1.writeMessage( m2 );

    obj1.disposeAll();
}
}

//App2.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create a new Measurement object m
    Measurement m = new Measurement();  

    System.Console.WriteLine("Queue name to subscribe to: ");
    string QueueName1 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName1 );

    //Read message into m
    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    System.Console.WriteLine("Another queue name to subscribe to: ");
    string QueueName2 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName2 );

    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    obj1.disposeAll();
}
}

person Demi    schedule 14.07.2011    source источник


Ответы (2)


два источника информации:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. Вы действительно должны сначала попытаться понять примеры.

    • %Program Files%\RabbitMQ\DotNetClient\examples\src (основные примеры)

    • получить полные рабочие примеры из их репозитория Mercurial (проекты c#).

Полезные операции для понимания:

  • Объявить/Утвердить/Слушать/Подписаться/Опубликовать

Re: ваш вопрос - нет причин, по которым вы не можете иметь несколько слушателей. Или вы можете подписаться на n путей маршрутизации с одним прослушивателем на «обмене».

** re: неблокирующий **

Типичный слушатель потребляет сообщения по одному. Вы можете вывести их из очереди, или они будут автоматически размещены рядом с потребителем в «оконной» форме (определяемой параметрами качества обслуживания qos). Прелесть подхода в том, что за вас делается много тяжелой работы (относительно надежности, гарантированной доставки и т. д.).

Ключевой особенностью RabbitMQ является то, что в случае ошибки при обработке сообщение повторно добавляется обратно в очередь (функция отказоустойчивости).

Нужно знать больше о вашей ситуации.

Часто, если вы отправляете сообщения в список, который я упомянул выше, вы можете связаться с кем-то из сотрудников RabbitMQ. Они очень полезны.

Надеюсь, это немного поможет. Поначалу это много, чтобы понять, но стоит упорствовать.


Вопросы и ответы

см.: http://www.rabbitmq.com/faq.html

В. Можно ли подписаться на несколько очередей, используя new Subscription(channel, queueName)?

да. Вы либо используете ключ привязки, например. abc.*.hij или abc.#.hij, или вы присоединяете несколько привязок. Первый предполагает, что вы разработали свои ключи маршрутизации на основе какого-то принципа, который имеет для вас смысл (см. ключи маршрутизации в FAQ). Для последнего вам необходимо привязаться к более чем одной очереди.

Реализация n-bindings вручную. см.: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

за этим шаблоном не так много кода, поэтому вы можете создать свой собственный шаблон подписки, если подстановочных знаков недостаточно. вы можете наследовать от этого класса и добавить еще один метод для дополнительных привязок... возможно, это сработает или что-то близкое к этому (не проверено).

Спецификация AQMP говорит, что возможна множественная ручная привязка: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

В. И если да, то как я могу просмотреть все подписанные очереди и вернуть сообщение (нулевое, если сообщений нет)?

С подписчиком вы будете уведомлены, когда сообщение будет доступно. В противном случае то, что вы описываете, представляет собой интерфейс вытягивания, в котором вы вытягиваете сообщение по запросу. Если сообщений нет, вы получите нуль, как хотите. Кстати: метод Notify, вероятно, более удобен.

Q. О, и заметьте, что у меня есть все эти операции разными методами. Я отредактирую свой пост, чтобы отразить код

Текущий код:

эта версия должна использовать подстановочные знаки для подписки более чем на один ключ маршрутизации

Ручная маршрутизация ключей с использованием подписки оставлена ​​читателю в качестве упражнения. ;-) Я думаю, вы все равно склонялись к вытягивающему интерфейсу. Кстати: интерфейсы pull менее эффективны, чем интерфейсы уведомлений.

        using (Subscription sub = new Subscription(ch, QueueNme))
        {
            foreach (BasicDeliverEventArgs ev in sub)
            {
                Process(ev.Body);

        ...

Примечание. foreach использует IEnumerable, а IEnumerable оборачивает событие получения нового сообщения с помощью оператора "yield". Фактически это бесконечный цикл.

--- ОБНОВИТЬ

AMQP был разработан с идеей, чтобы количество TCP-соединений было таким же низким, как и количество приложений, что означает, что вы можете иметь много каналов на одно соединение.

код в этом вопросе (редактирование 3) пытается использовать двух подписчиков с одним каналом, тогда как он должен (я полагаю) быть одним подписчиком на канал в потоке, чтобы избежать проблем с блокировкой. Предложение: используйте подстановочный ключ маршрутизации. С помощью Java-клиента можно подписаться на несколько разных имен очередей, но, насколько мне известно, клиент .net не реализовал это в вспомогательном классе Subscriber.

Если вам действительно нужны два разных имени очереди в одном и том же потоке подписки, то для .net предлагается следующая последовательность извлечения:

        using (IModel ch = conn.CreateModel()) {    // btw: no reason to close the channel afterwards IMO
            conn.AutoClose = true;                  // no reason to closs the connection either.  Here for completeness.

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:");
            }

            return 0;
        }

-- ОБНОВЛЕНИЕ 2:

из списка RabbitMQ:

«предположим, что element.Next() блокирует одну из подписок. Вы можете получать доставки из каждой подписки с тайм-аутом для чтения за ее пределами. В качестве альтернативы вы можете настроить одну очередь для получения всех измерений и извлечения сообщений из нее с помощью единая подписка». (Эмиль)

Это означает, что когда первая очередь пуста, .Next() блокируется, ожидая появления следующего сообщения. то есть у подписчика есть встроенное ожидание следующего сообщения.

-- ОБНОВЛЕНИЕ 3:

в .net используйте QueueingBasicConsumer для потребления из нескольких очередей.

На самом деле вот ветка об этом, чтобы получить представление об использовании:

Дождитесь одного сообщения RabbitMQ с тайм-аутом

-- ОБНОВЛЕНИЕ4:

еще немного информации о .QueueingBasicConsumer

Здесь есть пример кода.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

пример скопирован в ответ с некоторыми изменениями (см. //‹-----).

                IModel channel = ...;
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, false, null, consumer);  //<-----
            channel.BasicConsume(queueName2, false, null, consumer); //<-----
            // etc. channel.BasicConsume(queueNameN, false, null, consumer);  //<-----

            // At this point, messages will be being asynchronously delivered,
            // and will be queueing up in consumer.Queue.

            while (true) {
                try {
                    BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                    // ... handle the delivery ...
                    channel.BasicAck(e.DeliveryTag, false);
                } catch (EndOfStreamException ex) {
                    // The consumer was cancelled, the model closed, or the
                    // connection went away.
                    break;
                }
            }

-- ОБНОВЛЕНИЕ 5: простой get, который будет действовать на любую очередь (более медленный, но иногда более удобный метод).

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:"); 
                // deserialize body and display extra info here.
            }
person Community    schedule 14.07.2011
comment
Большое спасибо за ваш отзыв. Я все еще изучаю систему обмена сообщениями, и есть операции, которые я до сих пор не понимаю. Как слушание. Я также видел, как rabbitmq подписывается на очередь. Можете ли вы подписаться на несколько очередей, используя new Subscription(channel, queueName)? И если да, то как я могу просмотреть все подписанные очереди и вернуть сообщение (нулевое, если сообщений нет)? О, и заметьте, что у меня все эти операции разными методами. Я отредактирую свой пост, чтобы отразить код. - person Demi; 15.07.2011
comment
Еще раз спасибо. Я отредактировал код для функций подписки и записи выше. Однако у меня есть эта ошибка времени выполнения: если я подпишусь, скажем, на две очереди и попытаюсь прочитать сообщения, я смогу получить сообщения только в первый раз. Я не вижу, где я напортачил. Можете ли вы взглянуть на если для меня? - person Demi; 20.07.2011
comment
@ Деми ... это потребовало некоторой охоты. Я думаю, что вам не хватает подписок. Ack () в конце вашего цикла чтения? Это означает: «Я успешно обработал это сообщение, так что дайте мне следующее». Дайте мне знать, если это было так. В противном случае вы выглядите близко. - person sgtz; 20.07.2011
comment
Н.Б. это должно быть в конце вашего внешнего цикла - person sgtz; 20.07.2011
comment
если вы опубликуете консольную процедуру Main(), используя свой класс, я поиграю с ней на следующий день или около того, если хотите. Вы успешно прочитали более одного сообщения из одной очереди? - person sgtz; 20.07.2011
comment
то есть просто хотите иметь простую последовательность + знать, что мы синхронизированы - person sgtz; 20.07.2011
comment
нет. на самом деле это проблема. чтение сообщения не переходит к следующему, как только оно читается в первый раз, будь то из той же очереди или из другой. я отредактирую код, чтобы он отражал класс и две консольные процедуры, использующие этот класс. я тоже буду продолжать изучать это. - person Demi; 20.07.2011
comment
Спасибо за помощь. Я еще немного покопался и понял. - person Demi; 20.07.2011
comment
отличная работа. Не могли бы вы рассказать, в чем заключалась проблема, чтобы другие тоже могли извлечь пользу? благодаря. - person sgtz; 20.07.2011
comment
ну это смешно. он все еще не читает из нескольких очередей. однако я читал сообщения с помощью Next(), и какое-то время это работало, я думаю, в брокере были сохранены сообщения из предыдущей итерации App1.cs. поэтому я вернулся к поиску помощи. =) отредактированный код выше. - person Demi; 20.07.2011
comment
сообщения в очереди с последнего раза - это нормально (есть настройка автоудаления при отключении). кстати: знаете ли вы, что у вас есть два element.Ack(); в вашем ридере? Должен быть только один. Еще не пробовал ваш код, но я использую обмен темами. Вы используете прямой обмен. Несколько мыслей. Надо бежать. - person sgtz; 20.07.2011
comment
как я это пропустил? У вас есть две подписки (Список‹Подписка›), тогда как я думал, что вы пытаетесь подписаться на две очереди с одним подписчиком. Теперь... вы, вероятно, получаете проблемы с блокировкой потоков, когда два потока обращаются к одному и тому же соединению/каналу. Я всегда выполняю один последовательный набор операций с каждым набором потоков/каналов/соединений. re: подписка на более чем одну очередь с одним подписчиком, это определенно возможно с клиентом Java, и в настоящее время я спрашиваю о клиенте .net от вашего имени. - person sgtz; 21.07.2011
comment
Как вы также отметили, мое приложение должно иметь возможность использовать одно и то же соединение AMQP для получения сообщений. Однако я имею дело с возможностью потребления из нескольких очередей. Это помогает? - person Demi; 22.07.2011
comment
вы можете получать сообщения из более чем одной очереди по одному соединению. Однократная подписка на несколько очередей отличается от двухкратной подписки с двумя разными подписчиками. Причина просто в том, что они не удосужились поддержать это с помощью вспомогательного класса (т.е. это работает в java). Тем не менее, в клиенте .net по-прежнему присутствует необработанная функциональность, позволяющая получить то, что вы хотите. - person sgtz; 22.07.2011
comment
см. обновления 3 и 4. Это должно быть то, что вам нужно. Я добавлю обновление 5 в качестве запасной позиции для вас. - person sgtz; 23.07.2011
comment
Все ссылки здесь битые - person Jebathon; 18.08.2020

Самый простой способ — использовать EventingBasicConsumer. У меня есть пример на моем сайте о том, как его использовать. RabbitMQ EventingBasicConsumer

Этот потребительский класс предоставляет полученное событие, которое вы можете использовать, и, следовательно, НЕ блокирует. Остальной код в основном остается прежним.

person Kelly    schedule 24.05.2012