Я пытаюсь написать потребителя RabbitMQ в Go. Что предполагает брать 5 объектов за раз из очереди и обрабатывать их. Более того, предполагается, что в случае успешной обработки, иначе отправка в очередь недоставленных сообщений 5 раз, а затем отбрасывание, должна выполняться бесконечно и обрабатывать событие отмены потребителя. У меня есть несколько вопросов:
- Есть ли какая-либо концепция
BasicConsumer
противEventingBasicConsumer
в Reference RabbitMq-go? - Что такое
Model
в RabbitMQ и есть ли он в RabbitMq-go? - Как отправить объекты в случае сбоя в очередь недоставленных сообщений и снова поставить их в очередь после
ttl
- Каково значение аргумента
consumerTag
в функцииch.Consume
в приведенном ниже коде - Должны ли мы использовать
channel.Get()
илиchannel.Consume()
для этого сценария?
Какие изменения мне нужно внести в приведенный ниже код, чтобы выполнить вышеуказанное требование. Я спрашиваю об этом, потому что не смог найти достойную документацию RabbitMq-Go.
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
Отредактированный вопрос:
Я отложил обработку сообщений, как это предлагается в ссылках link1 ссылка2. Но проблема в том, что сообщения возвращаются в свою исходную очередь из очереди с недоставленными письмами даже после ttl. Я использую RabbitMQ 3.0.0
. Может ли кто-нибудь указать, в чем проблема?