Я исследую обработку журналов сеансов веб-пользователей через Google Dataflow / Apache Beam, и мне нужно объединить журналы пользователя по мере их поступления (потоковая передача) с историей сеанса пользователя за последний месяц.
Я рассмотрел следующие подходы:
- Используйте 30-дневное фиксированное окно: скорее всего, окно слишком большое, чтобы поместиться в память, и мне не нужно обновлять историю пользователя, просто обратитесь к нему
- Используйте CoGroupByKey для объединения двух наборов данных, но оба набора данных должны иметь одинаковый размер окна (https://cloud.google.com/dataflow/model/group-by-key#join), что неверно в моем случае (24 часа против 30 дней)
- Используйте боковой ввод, чтобы получить историю сеансов пользователя для заданного
element
вprocessElement(ProcessContext processContext)
Насколько я понимаю, данные, загруженные через .withSideInputs(pCollectionView)
, должны уместиться в памяти. Я знаю, что могу поместить в память всю историю сеансов отдельного пользователя, но не все истории сеансов.
Мой вопрос: есть ли способ загрузки / потоковой передачи данных с бокового ввода, который имеет отношение только к текущему сеансу пользователя?
Я представляю себе функцию parDo, которая будет загружать сеанс истории пользователя из бокового ввода, указав идентификатор пользователя. Но в память поместится только сеанс истории текущего пользователя; загрузка всех сеансов истории через боковой ввод будет слишком большой.
Некоторый псевдокод для иллюстрации:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}