Отфильтровать дубликаты в Stream Analytics

Я получаю данные от некоторых датчиков через несколько разных мостов. Данные, которые я получаю, содержат много дубликатов. С одним и тем же серийным номером, значениями, (почти) одинаковыми датами и т. Д., Но с разных мостов. Данные не включают в себя какой-то уникальный идентификатор события, а только временную метку, уникальную для каждого отдельного события, даже если оно дублируется. Поэтому я не могу фильтровать по ним.

Вот пример:

{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1583750353969,"dateTime":"2020-03-09T10:39:13Z","serialNo":"02001703","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-25,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":15.8,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":39,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001703","vif":7,"dif":27,"rssiWmbus":-94,"EventProcessedUtcTime":"2020-03-09T11:54:07.5197619Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-09T10:39:14.0440000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1583750354377,"dateTime":"2020-03-09T10:39:14Z","serialNo":"02001703","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"01000000","rssi":-35,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":15.8,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":39,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001703","vif":7,"dif":27,"rssiWmbus":-80,"EventProcessedUtcTime":"2020-03-09T11:54:07.5197619Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-09T10:39:14.4190000Z"}

Это какой-то способ отфильтровать дубликаты в Stream Analytics? Данные также в конечном итоге отправляются в Power BI, если есть возможность сделать это там. Но при использовании «удаления дубликатов» в Power Bi вам понадобится своего рода EventId, который уникален для всего остального, но такой же для дублированных данных.

Заранее спасибо!


person skh    schedule 13.03.2020    source источник


Ответы (2)


Согласно вашему описанию, вы просто хотите реализовать функцию distinct, которая похожа на функцию реляционной базы данных, чтобы вы могли фильтровать некоторые строки на основе некоторых столбцов.

Фактически, это может поддерживаться с некоторыми ограничениями в ASA. Основная идея заключается в использовании COUNT и GROUP BY ключевые слова.

Например, мои тестовые данные, как показано ниже:

введите здесь описание изображения

SQL:

ВЫБРАТЬ СЧЕТ (РАЗЛИЧНЫЙ b.timestamp), b.dsType, b.mrfCuId FROM blobstream b ГРУППА ПО b.dsType, b.mrfCuId, TumblingWindow (минута, 5)

Выход:

введите здесь описание изображения

Я получил несколько подсказок из этого официальный пример.

person Jay Gong    schedule 16.03.2020
comment
Спасибо! Итак, если я правильно вас понял, с помощью этого запроса вы выбираете одну из строк с тем же типом mrfcuid и ds в течение 5 минут? Потому что почти все данные имеют dstype = WMBUS и Mrfcuid = b827EBE84EEB. Мне интересно разделить, где одинаковый серийный номер, поэтому я могу просто сгруппировать по нему и получить окно тублинга всего за 2-3 секунды? - person skh; 16.03.2020
comment
Я попытался изменить запрос на использование серийного номера и окна тублинга в течение 2 секунд и получил взамен null. Другая проблема заключается в том, что я не совсем знаю, как включить другой запрос, который я сделал, который разделяет различные элементы в массиве: SELECT event.serialNo, CAST (event.dateTime как datetime) как TimeAndDate, DataP.ArrayValue.name, event. bridgeId, DataP.ArrayValue.value, DataP.ArrayValue.valueTYpe INTO TestOuput FROM eventhubInput AS событие CROSS APPLY GetArrayElements (event.datapoint) AS DataP WHERE DataP.ArrayValue.valueType = 'CSV' - person skh; 16.03.2020
comment
@skh Привет, я запутался, что вы получили нулевое значение в результате выполнения задания или процесса тестирования в пользовательском интерфейсе портала Azure? - person Jay Gong; 17.03.2020
comment
@skh Или, может быть, вы могли бы опубликовать некоторые образцы данных с помощью ASQL, который вы указали в последнем комментарии в своем вопросе, чтобы я мог проверить их на своей стороне. - person Jay Gong; 17.03.2020
comment
Большое спасибо! Я разместил образцы данных в ответе ниже, так как я не мог опубликовать изображение в этом комментарии :) - person skh; 17.03.2020
comment
Удалось ли вам увидеть образцы данных, которые я опубликовал в качестве ответа? :) - person skh; 18.03.2020
comment
@skh Конечно, я найду время, чтобы его просмотреть. Я изо всех сил стараюсь следить за этим делом. - person Jay Gong; 19.03.2020

Я не смог опубликовать картинку в комментарии, поэтому напишите свой ответ здесь.

Это мой результат вывода при выполнении запроса, который я опубликовал в своем комментарии к вам. Здесь вы можете видеть, что я извлек некоторые из требуемых значений из массива в каждой строке. И как вы можете видеть здесь, ряды 3 и 4 точно такие же, как ряды 1 и 2, только с разных мостов. То же самое со строками 7 и 8, 9 и 10. В идеале, если вы понимаете, мне нужен только один образец правильных данных, а не дублированный, как в этом примере.

Вот еще несколько примеров данных, если вы хотите проверить себя:

{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355883141,"dateTime":"2020-03-16T10:51:23Z","serialNo":"02001771","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":18.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001771","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:23.2682714Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:23.2420000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355898659,"dateTime":"2020-03-16T10:51:38Z","serialNo":"02001596","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":13.1,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":35,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001596","vif":7,"dif":27,"rssiWmbus":-45,"EventProcessedUtcTime":"2020-03-16T10:51:38.8337473Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:38.7330000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355898715,"dateTime":"2020-03-16T10:51:38Z","serialNo":"02001596","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":13.1,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":35,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001596","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:38.8337473Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:38.8110000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355904394,"dateTime":"2020-03-16T10:51:44Z","serialNo":"02001704","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":19.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":26,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001704","vif":7,"dif":27,"rssiWmbus":-58,"EventProcessedUtcTime":"2020-03-16T10:51:44.5783305Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:44.4680000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355904737,"dateTime":"2020-03-16T10:51:44Z","serialNo":"02001704","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":19.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":26,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001704","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:44.9080895Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:44.7960000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355907295,"dateTime":"2020-03-16T10:51:47Z","serialNo":"02001701","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":16.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001701","vif":7,"dif":27,"rssiWmbus":-86,"EventProcessedUtcTime":"2020-03-16T10:51:47.4262897Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:47.3750000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355908044,"dateTime":"2020-03-16T10:51:48Z","serialNo":"02001701","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":16.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001701","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:48.1936261Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:48.1250000Z"}
{"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355918798,"dateTime":"2020-03-16T10:51:58Z","serialNo":"02001701","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":[{"type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":16.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" "},{"type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "}],"uniqueId":"LAS02001701","vif":7,"dif":27,"rssiWmbus":-92,"EventProcessedUtcTime":"2020-03-16T10:51:58.9619079Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:58.8610000Z"}
person skh    schedule 17.03.2020