Как объединить потоковые данные с большим набором исторических данных в Dataflow / Beam

Я исследую обработку журналов сеансов веб-пользователей через Google Dataflow / Apache Beam, и мне нужно объединить журналы пользователя по мере их поступления (потоковая передача) с историей сеанса пользователя за последний месяц.

Я рассмотрел следующие подходы:

  1. Используйте 30-дневное фиксированное окно: скорее всего, окно слишком большое, чтобы поместиться в память, и мне не нужно обновлять историю пользователя, просто обратитесь к нему
  2. Используйте CoGroupByKey для объединения двух наборов данных, но оба набора данных должны иметь одинаковый размер окна (https://cloud.google.com/dataflow/model/group-by-key#join), что неверно в моем случае (24 часа против 30 дней)
  3. Используйте боковой ввод, чтобы получить историю сеансов пользователя для заданного element в processElement(ProcessContext processContext)

Насколько я понимаю, данные, загруженные через .withSideInputs(pCollectionView), должны уместиться в памяти. Я знаю, что могу поместить в память всю историю сеансов отдельного пользователя, но не все истории сеансов.

Мой вопрос: есть ли способ загрузки / потоковой передачи данных с бокового ввода, который имеет отношение только к текущему сеансу пользователя?

Я представляю себе функцию parDo, которая будет загружать сеанс истории пользователя из бокового ввода, указав идентификатор пользователя. Но в память поместится только сеанс истории текущего пользователя; загрузка всех сеансов истории через боковой ввод будет слишком большой.

Некоторый псевдокод для иллюстрации:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}

person Florian    schedule 29.04.2016    source источник


Ответы (1)


В настоящее время нет способа получить доступ к входам на стороне ключа в потоковой передаче, но он определенно был бы полезен именно так, как вы описываете, и это то, что мы рассматриваем для реализации.

Один из возможных обходных путей - использовать боковые входы для распределения указателей на фактическую историю сеанса. Код, генерирующий историю 24-часового сеанса, может загружать их в GCS / BigQuery / и т. Д., А затем отправлять местоположения в качестве побочного ввода в код присоединения.

person danielm    schedule 30.04.2016
comment
Спасибо за разъяснение, Даниэль ... будет ли это вариант (в потоке данных) динамически загружать историю пользователя через Bigtable с использованием идентификатора пользователя в качестве ключа, или это, скорее всего, убьет производительность (так как историю нужно будет запрашивать / загружать для каждый пользовательский сеанс)? - person Florian; 02.05.2016
comment
Пока вы загружаете историю для каждого сеанса (а не для каждого события), он, вероятно, будет работать достаточно хорошо. Поскольку данные истории постоянны, вы также можете добавить статический кеш. После того, как вы выполнили GroupByKey / Combine с вашими пользователями в качестве ключей, каждый пользователь, как правило, будет обрабатываться одним и тем же процессом, поэтому кеш должен работать достаточно хорошо. - person danielm; 02.05.2016