Поддерживаемый AMQP канал публикации-подписки и преобразование сообщений

Мы используем <int-amqp:publish-subscribe-channel/> как своего рода шину событий в нашем сервисном приложении. Метод отправки, а также обработчик сообщений основаны на классе Message из spring-messaging (начиная с spring-integration 4.0 (+). События — это изменения объектов, которые должны быть получены другими службами.

Проблема в том, что класс spring-messaging Message обрабатывается spring-amqp как произвольная полезная нагрузка объекта, поскольку он не распознается как spring-amqp Message. Это вызывает следующие проблемы:

  • формат сообщений по умолчанию — сериализованные объекты Java. spring-amqp не только сериализует только наш исходный объект полезной нагрузки, но и обертку Spring-сообщений Message, которая несовместима между Spring Framework 4.0 и 4.1.
  • настройка преобразователя сообщений для JSON (точнее, Jackson2JsonMessageConverter) не решает проблему, поскольку он также преобразует экземпляр Message, который является GenericMessage spring-integration, и который не может быть создан из JSON, поскольку ему не хватает соответствующего конструктора

Нам нужно смешивать версии Spring, так как у нас есть сервисы, реализованные с помощью Grails 2.4 (на основе Spring 4.0) и текущей версии Spring Boot (на основе Spring 4.1).

Есть ли какой-нибудь выход из этого, предпочтительно идиоматический способ пружинной интеграции? Может быть, есть другая абстракция вместо или в дополнение к PublishSubscribeAmqpChannel? Или любые другие средства преобразования сообщений, которые мы могли бы применить?


person rainerfrey    schedule 26.02.2015    source источник


Ответы (1)


Вместо использования канала с поддержкой amqp используйте адаптер исходящего канала для отправки и адаптер входящего канала для получения.

Канал содержит все сообщение (сериализованное), тогда как адаптеры передают полезную нагрузку в виде тела сообщения и (необязательно) отображают заголовки в/из заголовков amqp.

Вам нужно будет настроить разветвленный обмен для pub/sub (по умолчанию канал создаст обмен с именем si.fanout.<channelName>. Затем вы можете привязать очередь для каждого получателя.

person Gary Russell    schedule 26.02.2015
comment
Я попробую это. Является ли такое поведение PublishSubscribeChannel преднамеренным или это ошибка? Должен ли я открыть вопрос? - person rainerfrey; 26.02.2015
comment
Это так задумано (такой же метод используется в канале с поддержкой JMS), но я думаю, что мы могли бы внести улучшения, добавив возможность использовать метод сопоставления заголовков вместо отправки всего сообщения. Однако каналы с постоянным сохранением предназначены для сохранения сообщений в приложении (во избежание потери сообщений), а не для распространения сообщений по разрозненным системам/приложениям; именно для этого предназначены адаптеры каналов. - person Gary Russell; 26.02.2015
comment
Я понимаю концепцию постоянных каналов. Довольно нелогично (по крайней мере, для меня), что AMQP-поддерживаемые (то же самое для JMS в этом отношении) рассматриваются как постоянные-поддерживаемые, а не удаленные-транспортные- поддержанный. ИМХО, это должно быть четко указано в справочнике, где описание PublishSubscribeAMQPChannel в лучшем случае скудно. - person rainerfrey; 26.02.2015