Согласно названию, есть ли передовые практики, примеры и т. д. для написания модуля преобразователя агрегатора? У нас есть ситуация, в которой наша агрегация основана на отметке времени в данных, и данные могут поступать не по порядку, с опозданием или даже СЛИШКОМ поздно для рассмотрения.
В случае «слишком поздно» мы планируем использовать filter
, чтобы удалить их до того, как преобразователь их увидит.
Однако это сводится к основному вопросу — как написать преобразователь для агрегирования, если выходные данные агрегатора не основаны на входных данных? Можно ли записать асинхронный вывод в один и тот же поток XD? Или даже можно написать преобразователь для вывода на основе какого-то аспекта ввода, но только для определенных входов и ничего не выводить на другие?
В качестве конкретного, хотя и надуманного примера, я хочу, чтобы агрегатор брал кортеж из (String, Int) и суммировал Int на основе String, а затем ТОЛЬКО выводил агрегации, когда String является определенным значением; сказать «стоп». Достаточно легко продемонстрировать:
(foo, 1)
(bar, 2)
(foo, 2)
(stop, 0)
Когда последний кортеж входит в преобразователь, я должен выдать поток кортежей:
(foo, 3)
(bar, 2)
Как для этого написан трансформатор?