Потребитель RabbitMQ в Go

Я пытаюсь написать потребителя RabbitMQ в Go. Что предполагает брать 5 объектов за раз из очереди и обрабатывать их. Более того, предполагается, что в случае успешной обработки, иначе отправка в очередь недоставленных сообщений 5 раз, а затем отбрасывание, должна выполняться бесконечно и обрабатывать событие отмены потребителя. У меня есть несколько вопросов:

  1. Есть ли какая-либо концепция BasicConsumer против EventingBasicConsumer в Reference RabbitMq-go?
  2. Что такое Model в RabbitMQ и есть ли он в RabbitMq-go?
  3. Как отправить объекты в случае сбоя в очередь недоставленных сообщений и снова поставить их в очередь после ttl
  4. Каково значение аргумента consumerTag в функции ch.Consume в приведенном ниже коде
  5. Должны ли мы использовать channel.Get() или channel.Consume() для этого сценария?

Какие изменения мне нужно внести в приведенный ниже код, чтобы выполнить вышеуказанное требование. Я спрашиваю об этом, потому что не смог найти достойную документацию RabbitMq-Go.

   func main() {

        consumer()        
    }

    func consumer() {

        objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}      
        initializeConn(&objConsumerConn.conn)


        ch, err := objConsumerConn.conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        msgs, err := ch.Consume(
                objConsumerConn.queueName, // queue
                "demo1",     // consumerTag
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {                  
                k := new(EventCaptureData)
                b := bytes.Buffer{}
                b.Write(d.Body)
                dec := gob.NewDecoder(&b)  
                err := dec.Decode(&k)
                d.Ack(true)  

                if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
                    fmt.Println(k)                        
            }
        }()      

        log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
        <-forever

    }

Отредактированный вопрос:

Я отложил обработку сообщений, как это предлагается в ссылках link1 ссылка2. Но проблема в том, что сообщения возвращаются в свою исходную очередь из очереди с недоставленными письмами даже после ttl. Я использую RabbitMQ 3.0.0. Может ли кто-нибудь указать, в чем проблема?


person Naresh    schedule 05.04.2016    source источник
comment
Попробуйте пакет amqp для взаимодействия с кроликом, также у него есть очень приличная документация godoc.org/github. com/streadway/amqp   -  person PerroVerd    schedule 05.04.2016
comment
@PerroVerd Это то, что я использую.   -  person Naresh    schedule 05.04.2016


Ответы (1)


Есть ли какая-либо концепция BasicConsumer vs EventingBasicConsumer в справочнике RabbitMq-go?

Не совсем так, но вызовы Channel.Get и Channel.Consume служат похожей концепции. С Channel.Get у вас есть неблокирующий вызов, который получает первое сообщение, если оно доступно, или возвращает ok=false. С Channel.Consume сообщения из очереди доставляются в канал.

Что такое модель в RabbitMQ и есть ли она в RabbitMq-go?

Если вы имеете в виду IModel и Connection.CreateModel в C# RabbitMQ, это что-то из библиотеки C#, а не из самого RabbitMQ. Это была просто попытка абстрагироваться от терминологии RabbitMQ «Канал», но она так и не прижилась.

Как отправить объекты в случае сбоя в очередь недоставленных сообщений и снова поставить их в очередь после ttl

Используйте метод delivery.Nack с requeue=false.

Каково значение аргумента ConsumerTag в функции ch.Consume в приведенном ниже коде.

ConsumerTag — это просто идентификатор потребителя. Его можно использовать для отмены канала с помощью channel.Cancel, а также для определить потребителя, ответственного за доставку. Во всех сообщениях, доставленных с помощью channel.Consume, будет установлено поле ConsumerTag.

Должны ли мы использовать channel.Get() или channel.Consume() для этого сценария?

Я думаю, что channel.Get() почти никогда не предпочтительнее channel.Consume(). С channel.Get вы будете опрашивать очередь и тратить процессор впустую, ничего не делая, что не имеет смысла в Go.

Какие изменения мне нужно внести в приведенный ниже код, чтобы выполнить вышеуказанное требование.

  1. Поскольку вы выполняете пакетную обработку 5 за раз, у вас может быть горутина, которая получает данные из канала потребителя, и как только она получает 5 доставок, вы вызываете другую функцию для их обработки.

  2. Чтобы подтвердить или отправить в очередь недоставленных сообщений, вы будете использовать delivery.Ack или delivery.Nack. Вы можете использовать multiple=true и вызвать его один раз для партии. Как только сообщение попадает в очередь недоставленных сообщений, вы должны проверить заголовок delivery.Headers["x-death"], сколько раз оно было недоставленным, и вызвать delivery.Reject, когда попытка была повторена уже 5 раз.

  3. Используйте channel.NotifyCancel для обработки события отмены.

person Pedro Werneck    schedule 06.04.2016
comment
Большое спасибо за подробное объяснение. - person Naresh; 07.04.2016
comment
@ Педро... У меня одна проблема. Если я использую d.Ack(true) или d.Ack(false), он не публикует сообщения в очереди с недоставленными буквами. Где, как и в случае с d.Nack(true, false), он публикует. Но потом после ttl сбрасывает сообщения оттуда. Итак, каковы ценности для достижения того же - person Naresh; 07.04.2016
comment
Я отредактировал вопрос. Не могли бы вы взглянуть на это - person Naresh; 07.04.2016
comment
@Naresh Это новый вопрос RabbitMQ, не связанный с вопросом Go здесь. Вы должны создать новый вопрос с этим. - person Pedro Werneck; 07.04.2016
comment
Я создал новый вопрос. Скажите, пожалуйста, почему он ведет себя так http://stackoverflow.com/questions/36503804/dead-letterred-messages-not-getting-requeue-to-original-queue-after-ttl? - person Naresh; 08.04.2016