Использование Spring Integration и проверка подходов

На данный момент я тестирую использование Spring Integration, чтобы связать вместе разрозненные модули в одном приложении Spring-Boot и службы в единый поток, начиная с единой точки входа.

Если возможно, я ищу следующие пояснения по поводу Spring Integration:

  1. Является ли приведенный ниже код правильным способом структурирования потоков с использованием DSL?
  2. Могу ли я перенести результат в поток «В» ниже в строке «В»?
  3. Лучше ли использовать DSL или XML?
  4. Я не понимаю, как правильно «завершить» поток?

Обзор потока

В приведенном ниже коде я просто публикую страницу в место назначения. Общий поток выглядит так.

  1. Поток издателя прослушивает полезную нагрузку и разбивает ее на части.
  2. Content flow filters out pages and splits them into parts.
    1. AWS flow subscribes and handles the part.
    2. Файловый поток подписывается и обрабатывает часть.

В конце концов, в потоке издателя могут быть дополнительные и очень разные типы потребителей, которые не являются контентом, поэтому я отделяю издателя от контента.

A) Процесс публикации (publisher.jar):

Это мой «основной» поток, инициированный через шлюз. Цель состоит в том, чтобы это служило точкой входа для запуска всех потоков публикации.

  1. Получите сообщение
  2. Предварительно обработайте сообщение и сохраните его.
  3. Разделите полезную нагрузку на отдельные записи, содержащиеся в ней.
  4. Обогатите каждую запись остальными данными
  5. Поместите каждую запись в выходной канал.

Ниже приведен код:

@Bean
IntegrationFlow flowPublish()
{
    return f -> f
        .channel(this.publishingInputChannel())
        //Prepare the payload
        .<Package>handle((p, h) -> this.save(p))
        //Split the artifact resolved items
        .split(Package.class, Package::getItems)
        //Find the artifact associated to each item (if available)
        .enrich(
            e -> e.<PackageEntry>requestPayload(
                m ->
                {
                    final PackageEntry item = m.getPayload();
                    final Publishable publishable = this.findPublishable(item);
                    item.setPublishable(publishable);
                    return item;
                }))
        //Send the results to the output channel
        .channel(this.publishingOutputChannel());
}

Б) Поток контента (content.jar)

Этот модуль отвечает за обработку входящей полезной нагрузки «контента» (то есть страницы в данном случае) и разделение / маршрутизацию их к соответствующему подписчику (-ам).

  1. Слушайте выходной канал издателя
  2. Фильтровать записи только по типу страницы
  3. Добавьте исходную полезную нагрузку в заголовок для дальнейшего использования
  4. Преобразуйте полезную нагрузку в фактический тип
  5. Разделить страницу на отдельные элементы (блоки)
  6. Направьте каждый элемент на соответствующий канал PubSub.

По крайней мере, на данный момент подписанные потоки не возвращают никакого ответа - они должны просто запустить и забыть, но я хотел бы знать, как увеличить результат при использовании канала pub-sub.

Ниже приведен код:

@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
    return MessageChannels.publishSubscribe("assetPublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
    return MessageChannels.publishSubscribe("pagePublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
IntegrationFlow flowPublishContent()
{
    return flow -> flow
        .channel(this.publishingChannel)
        //Filter for root pages (which contain elements)
        .filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
        //Put the publishable details in the header
        .enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
        //Transform the item to a Page
        .transform(PackageEntry.class, PackageEntry::getPublishable)
        //Split page into components and put the type in the header
        .split(Page.class, this::splitPageElements)
        //Route content based on type to the subscriber
        .<PageContent, String>route(PageContent::getType, mapping -> mapping
            .resolutionRequired(false)
            .subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
            .subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
            .defaultOutputToParentFlow())
        .channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}

C) Контент AWS (aws-content.jar)

Этот модуль является одним из многих потенциальных подписчиков на потоки, относящиеся к конкретному контенту. Он обрабатывает каждый элемент индивидуально на основе маршрутизированного канала, опубликованного выше.

  1. Подписывайтесь на соответствующий канал.
  2. Относитесь к действию должным образом.

Может быть несколько модулей с потоками, которые подписываются на указанные выше маршрутизированные выходные каналы, это только один из них.

В качестве примера «contentPageChannel» может вызывать приведенный ниже flowPageToS3 (в модуле aws), а также flowPageToFile (в другом модуле).

Ниже приведен код:

@Bean
IntegrationFlow flowAssetToS3()
{
    return flow -> flow
        .channel(this.assetChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<PageContent>handle((p, h) ->
                                     {
                                         return this.publishS3Asset(p);
                                     })));
}

@Bean
IntegrationFlow flowPageToS3()
{
    return flow -> flow
        .channel(this.pageChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<Page>handle((p, h) -> this.publishS3Page(p))
                .enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
                .handle(this.s3MessageHandler())));
}



Ответы (1)


Во-первых, в вашем вопросе много контента: очень сложно сохранить всю информацию во время чтения. Это ваш проект, поэтому вы должны быть очень уверены в предмете. Но для нас это что-то новое, и мы можем просто отказаться от чтения, уже не говоря, с попыткой ответить.

В любом случае я постараюсь ответить на ваши вопросы вначале, хотя мне кажется, что вы собираетесь начать долгую дискуссию «что ?, как ?, почему?» ...

Является ли приведенный ниже код правильным способом структурирования потоков с использованием DSL?

Это действительно зависит от вашей логики. Это хорошая идея, чтобы различать логические компоненты, но это может быть накладными расходами, чтобы разделить отдельную банку по этому вопросу. Глядя на ваш код, мне кажется, что вы все еще собираете все в одно приложение Spring Boot и просто @Autowired соответствующие каналы для @Configuration. Так что да, отдельный @Configuration - хорошая идея, но отдельная банка - это накладные расходы. ИМХО.

Могу ли я перенести результат в поток «В» ниже в графе «В»?

Что ж, раз уж речь идет о публикации-подписке, ждать ответа действительно необычно. Сколько ответов вы получите от этих подписчиков? Да, вот в чем проблема - мы можем отправлять многим подписчикам, но не можем получить ответы от всех на один возврат. Вернемся к коду Java: у нас может быть несколько аргументов метода, но у нас есть только один return. То же самое применяется здесь в обмене сообщениями. В любом случае вы можете взглянуть на Scatter -Собрать реализацию паттерна.

Лучше ли использовать DSL или XML?

Оба являются просто высокоуровневым API. Ниже находятся те же компоненты интеграции. Заглянув в свое приложение, вы придете к тому же распределенному решению с конфигурацией XML. Не вижу причин отходить от Java DSL. По крайней мере, для вас это менее многословно.

Я не понимаю, как правильно «завершить» поток?

Совершенно непонятно, что у вас есть большое описание. Если вы отправляете в S3 или в файл, это прекращение. От этих компонентов нет ответа, поэтому некуда идти, нечего делать. Это просто остановка. То же самое и с методом Java с void. Если вас беспокоит шлюз точки входа, просто сделайте это void и не ждите никаких ответов. Дополнительную информацию см. В шлюзе обмена сообщениями. .

person Artem Bilan    schedule 27.06.2017
comment
Собственно, вы хорошо ответили на все мои вопросы - спасибо. Причина нескольких JAR заключается в том, что функции Spring-Boot могут быть включены или нет. Например, S3, если его нет, я могу просто ввести подписчиков File. Наконец, единственный вопрос, который у меня действительно возникает, касается подхода с использованием DSL. Я борюсь с тем, чтобы знать, что я хочу сделать, и иметь возможность выполнить это с помощью XML, но по разным причинам, например, используя чистую Java для конфигурации, я решил пойти по пути DSL. Структурно, на первый взгляд, является ли способ, которым я настроил потоки, действительным? - person KJQ; 27.06.2017
comment
Ой! Рад быть полезным! Рассмотрите возможность в будущем задавать более конкретные вопросы. Архитектура приложения - действительно сложный предмет, и нет четкого рецепта для всех решений. В конце концов, с опытом вы почувствуете себя более уверенно и уверенно в том, что вы делаете в своих проектах, но сейчас просто идите тем путем, который кажется лучше для вашего текущего уровня знаний. - person Artem Bilan; 27.06.2017