Spring AMQP и Elasticsearch — объединение сообщений

У нас есть потребитель в некоторых очередях RabbitMQ, который читает сообщения, а затем индексирует эти данные в Elasticsearch. Реализация выполнена с использованием spring-amqp. Чтобы повысить нашу производительность, мы планируем агрегировать сообщения на уровне потребителя и выполнять массовую вставку в Elasticsearch (это действительно повысит производительность).

Есть ли у вас какие-либо предложения о том, как это реализовать? Кроме того, еще одна деликатная проблема заключается в том, как обрабатывать ответы. Каждое сообщение имеет заголовок «reply_to», и мы используем входящий шлюз с каналом ответа, поэтому для каждого сообщения должен быть доставлен ответ.

Я думаю об использовании агрегатора из весенней интеграции со стратегией выпуска, основанной на размере пакета и периоде времени, когда истечет срок действия MessageGroupStore (и, конечно же, жнец). Входящий шлюз имеет исполнителя задач, скажем, 20, а также счетчик предварительной выборки, равный 20. Всякий раз, когда приходит запрос, сообщение будет добавлено в хранилище группы, и когда условие canRelease() будет в порядке, жнец или один из потоков, пришедших с запросом, выполнит массовую операцию. Но то, что я делаю с другими потоками, которые должны будут ждать ответа, который никогда не придет. Кроме того, я не знаю, как разбить ответ на большое агрегированное сообщение, чтобы на каждый маленький запрос был ответ.

Еще одна проблема, как я могу подтвердить сообщения? Из того, что я прочитал, транзакции снизят производительность на стороне RabbitMQ, поэтому я не очень доволен использованием атрибута «tx-size». Также этот атрибут может вести неверный подсчет, если время ожидания слишком мало.


person Cosmin Vasii    schedule 07.10.2014    source источник
comment
Решение, которое я нашел: замените входящий шлюз адаптерами входящего и исходящего каналов. Получите сообщение через адаптер входящего канала, цепочка продолжается с агрегатором. От агрегатора активатор службы получает массовое сообщение, выполняет свою работу, и большой ответ добавлено в ответный канал. Здесь я должен разбить большое сообщение на более мелкие и вручную подтвердить сообщения. Мне также нужно позаботиться о сохранении всех заголовков answer_to. После того, как у меня есть все небольшие ответные сообщения, я передаю их адаптеру исходящего канала с заголовками routing-key-expression=[reply_to].   -  person Cosmin Vasii    schedule 07.10.2014
comment
Я действительно не знаю, можно ли это сделать с помощью SI и spring-amqp. Мне нужно как-то обогатить заголовки для агрегированного сообщения или вручную создать их в агрегированном методе.   -  person Cosmin Vasii    schedule 07.10.2014


Ответы (1)


Ответ на вопрос о потребителе и агрегаторе:

Конфигурация для приема сообщений от AMQP и агрегирования. Совокупная стратегия основана на фиксации транзакции:

<amqp:inbound-channel-adapter queue-names="myQueue"
                                  transaction-manager="transactionManager"
                                  channel-transacted="true"
                                  channel="aggregateChannel"
                                  advice-chain="aggregatorReaperAdvice"
                                  concurrent-consumers="4"
                                  tx-size="100"/>

<aggregator input-channel="aggregateChannel" output-channel="storeChannel"
                expire-groups-upon-completion="true"
                correlation-strategy-expression="T(Thread).currentThread().id"
                release-strategy-expression="^[payload.equals(@AGGREGATOR_RELEASE_MARK)] != null"
                expression="?[!payload.equals(@AGGREGATOR_RELEASE_MARK)].![payload]"/>

ReaperAdvice (код Groovy):

@Service
class AggregatorReaperAdvice implements MethodBeforeAdvice, InitializingBean {

    private static final TRANSACTION_RESOURCE_MARK = 'TRANSACTION_RESOURCE_MARK'

    public static final AGGREGATOR_RELEASE_MARK = 'AGGREGATOR_RELEASE_MARK'

    MessagingTemplate messagingTemplate

    @Autowired
    MessageChannel aggregateChannel

    @Override
    void afterPropertiesSet() throws Exception {
        Assert.notNull aggregateChannel, "aggregateChannel must not be null"
        messagingTemplate = new MessagingTemplate(aggregateChannel)
    }

    @Override
    void before(Method method, Object[] args, Object target) {
        if (!TransactionSynchronizationManager.hasResource(AggregatorReaperAdvice)) {
            TransactionSynchronizationManager.bindResource(AggregatorReaperAdvice, TRANSACTION_RESOURCE_MARK)
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

                @Override
                void beforeCommit(boolean readOnly) {
                    messagingTemplate.send(MessageBuilder.withPayload(AGGREGATOR_RELEASE_MARK).build())
                }

                @Override
                void afterCompletion(int status) {
                    TransactionSynchronizationManager.unbindResource(AggregatorReaperAdvice)
                }

            })
        }
    }
}

Дайте мне знать, если это не ясно.

Все остальные вопросы будут рассмотрены в ближайшее время.

Для manual ack можно использовать channel.basicAck(deliveryTag, true); - для ack последнего deliveryTag для всех предыдущих сообщений.

Для случая headers["reply_to"]... Я думаю, вы должны предоставить пользовательский AbstractAggregatingMessageGroupProcessor для aggregator и убить двух зайцев: совокупный результат агрегатора и итерацию по MessageGroup.getMessages(), чтобы отправить каждого из них для процесса ответа предоставленному MessageChannel. Это быстрое решение для вашего случая.

Нечто похожее, но более слабо связанное решение может быть основано на результатах агрегатора и его MessageGroupStore, где вы извлекаете correlationKey для получения группы и ее сообщений для выполнения желаемой логики reply. В этом случае удалять группу из хранилища следует не агрегатором, а вручную после извлечения этой группы.

person Artem Bilan    schedule 07.10.2014
comment
Привет Артем. Большое спасибо за Ваш ответ. Итак, моя идея выглядит приемлемой для вас, насколько я понимаю: d. Как вы правильно заметили, код AggregatorReaperAdvice мне не очень понятен. Не могли бы вы объяснить смысл транзакций здесь? Мне не нравится устанавливать размер tx из-за следующего сценария. Если у меня prefetch-count=tx-size=100 (prefetch0count‹tx-size не подходит), моя служба умерла и много больших сообщений (возможно, 1 МБ данных) сохраняются на Rabbit, когда моя служба перезапускается, в ОЗУ будет добавлено много сообщений, и у меня может быть OutOfMemory (10consumers * 100 * 1Mb = 1Gb) - person Cosmin Vasii; 08.10.2014
comment
Что ж, мой AggregatorReaperAdvice применяется к процессу Listener, и он будет внутри TX. Он регистрирует TXSync, который завязан на TX и делает искусственный агрегатор release отправляющим какое-то MARK сообщение. Имея это, я собираю ровно tx-size сообщений или меньше и в пределах TX. Поскольку вы беспокоитесь об оперативной памяти, вы можете поиграть с этими tx-size и concurency. Таким образом, процессов-слушателей не будет больше, чем вы позволяете. Вы должны быть уверены, что вы выполняете другой нисходящий поток в этом потоке прослушивателя, чтобы заблокировать его и не разрешить запуск новой задачи прослушивателя. - person Artem Bilan; 08.10.2014
comment
Извините за настойчивость, но у меня есть еще 3 вопроса. 1) Какие преимущества имеет стратегия корреляции, основанная на идентификаторе потока? С 4 потребителями у меня будет 1 SimpleMessageStore с 4 SimpleMessageGroup. 2) Если создается исключение, транзакция будет откатываться. Если я настроил канал ошибок с помощью активатора службы (мы всегда возвращаем ответ о том, какое исключение было сгенерировано, и мы не хотим повторно доставлять сообщение), я все равно получу здесь failedMessage? Я точно не знаю, но Я думаю, что сообщение будет повторно поставлено в очередь. - person Cosmin Vasii; 08.10.2014
comment
3) Как я могу реализовать механизм возврата ответов на разные каналы ответов и, возможно, очереди Rabbit для успешных действий и неудачных? Я думал о двух заголовках в исходном сообщении, один для successful_reply_to и один для errors_reply_to - person Cosmin Vasii; 08.10.2014
comment
1) правда. 2) Нет, TX откатывается только в том случае, если вы повторно сбрасываете Exception от того сервис-активатора error-channel. 3) Я бы сказал, что достаточно иметь только один заголовок reply_to, но построить целевой routing_key на основе логики - успешно или ошибочно. - person Artem Bilan; 08.10.2014
comment
Еще раз привет @Артем Билан. Вы на самом деле не ответили на мой вопрос 1): d, о преимуществах стратегии корреляции, основанной на идентификаторе потока. Я посмотрел на код и увидел эту строку this.lockRegistry.obtain(UUIDConverter.getUUID(correlationKey).toString()), которая говорит мне, что потокам-потребителям не придется ждать, чтобы получить lock, так как каждый поток будет иметь свою собственную блокировку. Это верно? Насколько я понимаю, это преимущество. Кроме того, если я не сделаю эту корреляцию, то все потребительские потоки должны будут ждать, чтобы получить блокировку, что снизит производительность. - person Cosmin Vasii; 10.10.2014
comment
Поскольку в моем примере concurrent-consumers="4", предполагается, что у меня может быть 4 одновременных потока для получения сообщений из очереди. С другой стороны у нас есть tx-size="100" вариант. Я имею в виду, что каждый параллельный поток будет обернут TX и будет потреблять его до 100 или no more messages. И отправляет свой ТХ. Наш слушатель просто отправляет сообщения агрегатору. Если мы знаем, что у нас может быть группа из этих 100 сообщений, мы можем просто сопоставить их с текущим thread id, потому что у нас нет другого выбора для корреляции. И да: наши потоки не ждут своей блокировки. - person Artem Bilan; 10.10.2014