Spring XD: Написание агрегатора; как выводить асинхронно или хотя бы не 1:1?

Согласно названию, есть ли передовые практики, примеры и т. д. для написания модуля преобразователя агрегатора? У нас есть ситуация, в которой наша агрегация основана на отметке времени в данных, и данные могут поступать не по порядку, с опозданием или даже СЛИШКОМ поздно для рассмотрения.

В случае «слишком поздно» мы планируем использовать filter, чтобы удалить их до того, как преобразователь их увидит.

Однако это сводится к основному вопросу — как написать преобразователь для агрегирования, если выходные данные агрегатора не основаны на входных данных? Можно ли записать асинхронный вывод в один и тот же поток XD? Или даже можно написать преобразователь для вывода на основе какого-то аспекта ввода, но только для определенных входов и ничего не выводить на другие?

В качестве конкретного, хотя и надуманного примера, я хочу, чтобы агрегатор брал кортеж из (String, Int) и суммировал Int на основе String, а затем ТОЛЬКО выводил агрегации, когда String является определенным значением; сказать «стоп». Достаточно легко продемонстрировать:

(foo, 1)
(bar, 2)
(foo, 2)
(stop, 0)

Когда последний кортеж входит в преобразователь, я должен выдать поток кортежей:

(foo, 3)
(bar, 2)

Как для этого написан трансформатор?


person Michael Campbell    schedule 21.12.2014    source источник


Ответы (1)


Используйте процессорный модуль aggregator, а не transform. стандартный модуль агрегатора не его нет, но есть агрегатор Spring Integration может отбрасывать просроченные сообщения через discard-channel (если expire-groups-on-completion равно false).

Сам агрегатор, когда стратегия релиза определяет, что группу можно выпустить (или через group-timeout), может делать с группой все, что угодно, возвращая любой результат, который выдается и пойдет к следующему модулю в потоке. Таким образом, вы обычно выполняете накопление кортежей там или используете агрегатор по умолчанию, который выпускает коллекцию, и вы можете использовать transform модуль ниже по течению.

Агрегатор по умолчанию является пассивным компонентом; если вы не настроите MessageGroupStoreReaper или не используете group-timeout, выпуск группы запускается только тогда, когда приходит сообщение для этой группы (имеет ту же корреляцию).

person Gary Russell    schedule 21.12.2014
comment
Спасибо Гэри. Одна вещь, с которой у меня большие проблемы в XD, — это предположение, что пользователь будет использовать его ТОЛЬКО для объединения существующих вещей, а не для написания своего собственного. Если я хочу написать свой собственный агрегатор, какой метод я должен реализовать (похожий на transform())? Что такое подпись? Это где-нибудь задокументировано? Должен ли я искать подобные вещи в Интеграции, а не в документах XD? - person Michael Campbell; 22.12.2014
comment
Такого предположения нет; есть раздел в руководстве, в котором рассказывается о разработке модулей и расширений. Да, модули XD представляют собой потоки Spring Integration, поэтому подробности можно найти в справочном документе Spring Integration< /а>. XD 1.1 будет поддерживать создание модулей с Spring Integration Java DSL вместо использования конфигурации XML. - person Gary Russell; 22.12.2014
comment
Да, я прочитал этот раздел 100 раз, и единственный код, который я могу там найти, который показывает имена методов или сигнатуры, — это тесты или для метода преобразования. Что можно реализовать, например, для filter. Я смотрю не в том месте(ах)? - person Michael Campbell; 22.12.2014
comment
См. документацию Spring Integration — фильтр может ссылаться на метод POJO, который возвращает логическое значение, передавая полезную нагрузку или все сообщение, или вы можете использовать выражение SpEL <filter ... expression="payload.foo == 'bar'" />, которое будет отфильтровывать все сообщения, в которых полезная нагрузка (в данном случае javabean ) свойство foo не имеет значения bar. XD построен на основе Spring Integration, поэтому при настройке требуется знание Spring Integration. - person Gary Russell; 22.12.2014
comment
Думаю, я отождествляю себя с тем, с чем сталкивается Майкл. На самом деле это не технические детали, а процесс создания чего-то на XD. Откуда я/мы узнаю, является ли наше решение действительно лучшим способом ведения дел. В моей области применения я могу придумать как минимум 3 одинаково хороших способа решения одной и той же проблемы. Полезный набор того, что можно и чего нельзя делать, потенциально может сэкономить много времени на разработку лучших решений. Я даже не уверен, что это ответ типа stackoverflow, но он исходит от активного пользователя и не обязательно от самого опытного программиста. - person Arun Jose; 24.02.2015