Scio: groupByKey не работает при использовании Pub/Sub в качестве источника коллекции

Я изменил источник пример WindowsWordCount из текстового файла в облачный Pub/Sub, как показано ниже. Я опубликовал данные шекспировского файла в Pub/Sub, которые были загружены правильно, но ни одно из преобразований после .groupByKey, похоже, не работает.

sc.pubsubSubscription[String](psSubscription)
  .withFixedWindows(windowSize) // apply windowing logic
  .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
  .countByValue
  .withWindow[IntervalWindow]
  .swap
  .groupByKey
  .map {
    s =>
      println("\n\n\n\n\n\n\n This never prints \n\n\n\n\n")
      println(s)
  }

person Kakaji    schedule 19.06.2017    source источник


Ответы (2)


Изменение ввода из текстового файла в PubSub неограниченное количество PCollection. Группировка по ключу требует определения триггеров агрегации, иначе группировщик будет ждать вечно. Это упоминается в документации по потоку данных здесь: https://cloud.google.com/dataflow/model/group-by-key

Примечание. Для выполнения GroupByKey для неограниченной PCollection требуется либо неглобальный оконный режим, либо триггер агрегации. Это связано с тем, что ограниченный GroupByKey должен ожидать сбора всех данных с определенным ключом; но с неограниченной коллекцией данные не ограничены. Работа с окнами и/или триггеры позволяют группировать логические конечные пакеты данных в неограниченном потоке данных.

Если вы примените GroupByKey к неограниченной коллекции PCollection, не задав неглобальную оконную стратегию, стратегию триггера или и то, и другое, Dataflow сгенерирует ошибку IllegalStateException при построении вашего конвейера.

К сожалению, в Python SDK Apache Beam, похоже, не поддерживает триггеры (пока), поэтому я не уверен, какое решение будет в python.

(см. https://beam.apache.org/documentation/programming-guide/#triggers)

person franz    schedule 22.06.2017
comment
Вопрос как таковой касается scala, у которого есть четко определенные триггеры. - person Haris Nadeem; 08.03.2019

Что касается комментария Франца выше (я бы ответил на его комментарий конкретно, если бы StackOverflow позволил мне!), я вижу, что документы говорят, что запуск не реализован... но они также говорят, что функции базы данных в реальном времени недоступны, в то время как наша текущий проект их активно использует. Они просто новые.

См. триггерные функции здесь: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py

Осторожно, API незакончен, так как это не готовый к выпуску код. Но это доступно.

person Josh Babb    schedule 27.06.2017
comment
кажется, что вы можете сформулировать это в ответ, в нынешнем виде он выглядит как комментарий из-за того, как вы упомянули, что хотели бы, чтобы он был одним из них. В дальнейшем не комментируйте ответы на вопросы, это против правил, если у вас есть ответ, публикуйте его как один, если нет, не публикуйте как ответ. Формулировка этого поста делает его возможным кандидатом на удаление. - person Krupip; 27.06.2017