Сверхурочная группировка данных датчиков с помощью Spark Structured Streaming

У нас есть датчики, которые запускаются и работают в случайном порядке несколько раз в день. Данные от датчиков отправляются в тему Kafka, используются Spark Structured streaming API и сохраняются в Delta Lake. Теперь нам нужно идентифицировать и сохранять сеансы для каждого датчика в отдельной таблице Delta Lake, разделенной на device_id и sensor_id.

Я пробовал использовать Spark Structured Streaming с водяными знаками, но ничего не добился.

stream2 = spark.readStream.format('delta')
             .load('<FIRST_DELTA_LAKE_TABLE>')
             .select('device_id', 'json', 'time')
             .withWatermark('timestamp', '10 minutes')
             .groupBy('device_id').agg(F.min('time').alias('min_time'), F.max('time').alias('max_time')))
             .writeStream
             .format("delta")
stream2.start("<SESSIONS_TABLE>")

Идея заключалась в том, чтобы иметь вторую таблицу, идентифицирующую сеансы по входящим данным и сохраняющую время начала и время окончания для каждого сеанса и устройства. Задания потоковой передачи выполняются, и в дельта-таблицу сеансов ничего не записывается.

Любая помощь по этому поводу будет оценена по достоинству.


comment
На самом деле они находятся в двух разных потоковых заданиях. Первое задание потоковой передачи просто записывает необработанные данные в дельта-таблицу, а это второе потоковое задание, которое считывает данные из этой дельта-таблицы.   -  person Hasif Subair    schedule 17.02.2021


Ответы (1)


По умолчанию, когда вы пишете поток, он по умолчанию использует режим append (см. doc). И в этом режиме, когда вы используете водяные знаки, данные выводятся только после того, как водяной знак пересечен, поэтому будет задержка не менее 10 минут, пока вы не начнете видеть данные в выводе.

Но я думаю, что основная проблема заключается в том, что вы запускаете глобальные агрегаты без определенного окна или что-то в этом роде. Обычно для обнаружения сеанса люди используют flatMapGroupWithState, что-то вроде описанного в следующем запись в блоге.

person Alex Ott    schedule 17.02.2021
comment
Я работаю с pyspark. Поддерживается ли flatMapGroupWithState в Python? - person Hasif Subair; 17.02.2021
comment
ах, не заметил, что Python ... Нет, он доступен только в Java / Scala - person Alex Ott; 17.02.2021
comment
Я решил это в пакетном режиме, используя группу по и с запаздыванием по временной метке. Есть ли способ найти разницу во времени между последовательными сообщениями для устройства с потоковой передачей искры? - person Hasif Subair; 17.02.2021
comment
вы также можете разделить по окну и искать там ... но если датчик может отправлять данные в любое время, то размер окна может быть не лучшим подходом - person Alex Ott; 17.02.2021