Настройка Spring Integration QueueChannel для RabbitMQ

В настоящее время я использую Spring AMQP API для подключения к RabbitMQ. В основном в моем потребительском коде я читаю сообщения асинхронно и делаю массовую вставку в эластичный поиск. когда я делаю ack = AUTO, я получаю умеренную скорость 400-500 msg / sec (чтение из очереди). когда я делаю ack = NONE, скорость чтения увеличивается, то есть до 5000-6000 msg / sec.

Конфигурация следующая:

  • Машина Linux с 32 ГБ оперативной памяти
  • Аргументы JVM:

-server -Xms1g -Xmx1g -Xss384k PermSize=256m MaxPermSize=256m

Теперь проблема в том, что когда я выполняю ack = NONE, хотя я получаю хорошую скорость, JVM через некоторое время получает OutOfMemory, и я вижу, что в этом сценарии происходит много GC.

Я планирую использовать интеграцию QueueChannel из Spring, где я могу ограничить размер канала в зависимости от количества сообщений, которые он может содержать.

Как я могу реализовать это с помощью RabbitMQ, также есть ли другой способ достичь хорошей скорости чтения, например 4000-5000 msg / sec, без сбоя JVM?


person Shantanoo K    schedule 25.09.2013    source источник


Ответы (1)


Попробуйте использовать ack=AUTO, но установите prefetch и txSize, скажем, 1000; Таким образом, подтверждение будет отправляться только каждые 1000 сообщений (вы можете настроить это значение по мере необходимости).

Использование ackmode = NONE приводит к тому, что сообщения накапливаются в неограниченной блокирующей очереди в контейнере слушателя и будут работать только в том случае, если слушатель может не отставать (OOM означает, что ваш не может).

У нас есть открытая проблема JIRA, чтобы решить эту проблему для ack = NONE, но описанный выше метод является обычно достаточно.

Вы также можете улучшить пропускную способность, используя настройки параллелизма контейнеров.

person Gary Russell    schedule 25.09.2013
comment
У меня уже есть предварительная выборка 750, но я не использую txSize :( какой это будет эффект? Также я могу использовать QueueChannel Spring Integration с RabbitMQ, потому что он имеет ограничение по размеру и т. Д. - person Shantanoo K; 25.09.2013
comment
txSize определяет, как часто мы отправляем подтверждения. Это не совсем удачное название собственности - оно не имеет ничего общего с транзакциями. Из справочного руководства: При использовании с подтверждением режима AUTO контейнер будет пытаться обработать до этого количества сообщений перед отправкой подтверждения (ожидая каждого из них до установки тайм-аута приема). Это также происходит, когда фиксируется транзакционный канал. Если prefetchCount меньше txSize, он будет увеличен, чтобы соответствовать txSize. Использование SI QueueChannel не поможет в этом сценарии - использование памяти находится внутри контейнера слушателя. - person Gary Russell; 25.09.2013