У нас есть датчики, которые запускаются и работают в случайном порядке несколько раз в день. Данные от датчиков отправляются в тему Kafka, используются Spark Structured streaming API и сохраняются в Delta Lake. Теперь нам нужно идентифицировать и сохранять сеансы для каждого датчика в отдельной таблице Delta Lake, разделенной на device_id и sensor_id.
Я пробовал использовать Spark Structured Streaming с водяными знаками, но ничего не добился.
stream2 = spark.readStream.format('delta')
.load('<FIRST_DELTA_LAKE_TABLE>')
.select('device_id', 'json', 'time')
.withWatermark('timestamp', '10 minutes')
.groupBy('device_id').agg(F.min('time').alias('min_time'), F.max('time').alias('max_time')))
.writeStream
.format("delta")
stream2.start("<SESSIONS_TABLE>")
Идея заключалась в том, чтобы иметь вторую таблицу, идентифицирующую сеансы по входящим данным и сохраняющую время начала и время окончания для каждого сеанса и устройства. Задания потоковой передачи выполняются, и в дельта-таблицу сеансов ничего не записывается.
Любая помощь по этому поводу будет оценена по достоинству.