На данный момент я тестирую использование Spring Integration, чтобы связать вместе разрозненные модули в одном приложении Spring-Boot и службы в единый поток, начиная с единой точки входа.
Если возможно, я ищу следующие пояснения по поводу Spring Integration:
- Является ли приведенный ниже код правильным способом структурирования потоков с использованием DSL?
- Могу ли я перенести результат в поток «В» ниже в строке «В»?
- Лучше ли использовать DSL или XML?
- Я не понимаю, как правильно «завершить» поток?
Обзор потока
В приведенном ниже коде я просто публикую страницу в место назначения. Общий поток выглядит так.
- Поток издателя прослушивает полезную нагрузку и разбивает ее на части.
- Content flow filters out pages and splits them into parts.
- AWS flow subscribes and handles the part.
- Файловый поток подписывается и обрабатывает часть.
В конце концов, в потоке издателя могут быть дополнительные и очень разные типы потребителей, которые не являются контентом, поэтому я отделяю издателя от контента.
A) Процесс публикации (publisher.jar):
Это мой «основной» поток, инициированный через шлюз. Цель состоит в том, чтобы это служило точкой входа для запуска всех потоков публикации.
- Получите сообщение
- Предварительно обработайте сообщение и сохраните его.
- Разделите полезную нагрузку на отдельные записи, содержащиеся в ней.
- Обогатите каждую запись остальными данными
- Поместите каждую запись в выходной канал.
Ниже приведен код:
@Bean
IntegrationFlow flowPublish()
{
return f -> f
.channel(this.publishingInputChannel())
//Prepare the payload
.<Package>handle((p, h) -> this.save(p))
//Split the artifact resolved items
.split(Package.class, Package::getItems)
//Find the artifact associated to each item (if available)
.enrich(
e -> e.<PackageEntry>requestPayload(
m ->
{
final PackageEntry item = m.getPayload();
final Publishable publishable = this.findPublishable(item);
item.setPublishable(publishable);
return item;
}))
//Send the results to the output channel
.channel(this.publishingOutputChannel());
}
Б) Поток контента (content.jar)
Этот модуль отвечает за обработку входящей полезной нагрузки «контента» (то есть страницы в данном случае) и разделение / маршрутизацию их к соответствующему подписчику (-ам).
- Слушайте выходной канал издателя
- Фильтровать записи только по типу страницы
- Добавьте исходную полезную нагрузку в заголовок для дальнейшего использования
- Преобразуйте полезную нагрузку в фактический тип
- Разделить страницу на отдельные элементы (блоки)
- Направьте каждый элемент на соответствующий канал PubSub.
По крайней мере, на данный момент подписанные потоки не возвращают никакого ответа - они должны просто запустить и забыть, но я хотел бы знать, как увеличить результат при использовании канала pub-sub.
Ниже приведен код:
@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
return MessageChannels.publishSubscribe("assetPublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
return MessageChannels.publishSubscribe("pagePublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.publishingChannel)
//Filter for root pages (which contain elements)
.filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
//Put the publishable details in the header
.enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
//Transform the item to a Page
.transform(PackageEntry.class, PackageEntry::getPublishable)
//Split page into components and put the type in the header
.split(Page.class, this::splitPageElements)
//Route content based on type to the subscriber
.<PageContent, String>route(PageContent::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
.subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
.defaultOutputToParentFlow())
.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
C) Контент AWS (aws-content.jar)
Этот модуль является одним из многих потенциальных подписчиков на потоки, относящиеся к конкретному контенту. Он обрабатывает каждый элемент индивидуально на основе маршрутизированного канала, опубликованного выше.
- Подписывайтесь на соответствующий канал.
- Относитесь к действию должным образом.
Может быть несколько модулей с потоками, которые подписываются на указанные выше маршрутизированные выходные каналы, это только один из них.
В качестве примера «contentPageChannel» может вызывать приведенный ниже flowPageToS3 (в модуле aws), а также flowPageToFile (в другом модуле).
Ниже приведен код:
@Bean
IntegrationFlow flowAssetToS3()
{
return flow -> flow
.channel(this.assetChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<PageContent>handle((p, h) ->
{
return this.publishS3Asset(p);
})));
}
@Bean
IntegrationFlow flowPageToS3()
{
return flow -> flow
.channel(this.pageChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this.publishS3Page(p))
.enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
.handle(this.s3MessageHandler())));
}