У нас есть потребитель в некоторых очередях RabbitMQ, который читает сообщения, а затем индексирует эти данные в Elasticsearch. Реализация выполнена с использованием spring-amqp. Чтобы повысить нашу производительность, мы планируем агрегировать сообщения на уровне потребителя и выполнять массовую вставку в Elasticsearch (это действительно повысит производительность).
Есть ли у вас какие-либо предложения о том, как это реализовать? Кроме того, еще одна деликатная проблема заключается в том, как обрабатывать ответы. Каждое сообщение имеет заголовок «reply_to», и мы используем входящий шлюз с каналом ответа, поэтому для каждого сообщения должен быть доставлен ответ.
Я думаю об использовании агрегатора из весенней интеграции со стратегией выпуска, основанной на размере пакета и периоде времени, когда истечет срок действия MessageGroupStore (и, конечно же, жнец). Входящий шлюз имеет исполнителя задач, скажем, 20, а также счетчик предварительной выборки, равный 20. Всякий раз, когда приходит запрос, сообщение будет добавлено в хранилище группы, и когда условие canRelease() будет в порядке, жнец или один из потоков, пришедших с запросом, выполнит массовую операцию. Но то, что я делаю с другими потоками, которые должны будут ждать ответа, который никогда не придет. Кроме того, я не знаю, как разбить ответ на большое агрегированное сообщение, чтобы на каждый маленький запрос был ответ.
Еще одна проблема, как я могу подтвердить сообщения? Из того, что я прочитал, транзакции снизят производительность на стороне RabbitMQ, поэтому я не очень доволен использованием атрибута «tx-size». Также этот атрибут может вести неверный подсчет, если время ожидания слишком мало.