Нано-отметка времени сообщения NiFi Avro Kafka (19 цифр) преобразуется в отметку времени с миллисекундами

Теперь я столкнулся с проблемой преобразования записи сообщения Kafka типа long для наносекунд (19 цифр) в строковую метку времени с миллисекундами. Сообщения поступают в формате Avro и содержат разные схемы (поэтому мы не можем статически определить одну схему), хранящиеся в Confluent Schema Registry. Текущий процесс:

1) ConsumeKafkaRecord_2_0, который читает сообщение и сохраняет схему Avro, поступающую из реестра Confluent Schema Registry, в атрибут avro.schema.

2) UpdateAttribute, который ищет образец записи timestamp в avro.schema и добавляет "logicalType": "timestamp-micros" (потому что я не могу найти тип timestamp-nanos в спецификации Avro)

3) ConvertRecord, который преобразует файл потока Avro с использованием avro.schema в JSON. Он использует логический тип, присвоенный на предыдущем шаге, и преобразует 19 цифр в формат гггг-мм-дд ЧЧ: мм: СС.SSSSSS. Здесь проблема в том, что 19 цифр - это тип нанометки времени, который отсутствует в спецификации Avro, поэтому мы можем использовать только тип timestamp-micros и получать значения 51000+ лет.

4) ReplaceText - этот процессор дает нам обходной путь для проблемы, описанной выше, и мы заменяем значения шаблона 5-значного года на "правильное" datetime (с миллисекундами, потому что Java почему-то не может работать с микросекундами), используя и выражение: $ {'$ 1': toDate ('yyyyy-MM-dd HH: mm: ss.SSSSSS'): toNumber (): toString (): substring (0, 13): toNumber (): toDate (): format ('гггг-ММ-дд ЧЧ: мм: сс.ССС')}

После этого мы переходим к другим процессорам, обходной путь работает, но со странной проблемой - наши итоговые временные метки отличаются на несколько миллисекунд от того, что мы получаем в Kafka. Могу только догадываться, что это результат описанных выше преобразований. Вот почему мой вопрос: есть ли лучший способ обрабатывать 19-значные значения, поступающие в сообщениях Avro (схемы находятся в реестре Confluent Schema, шаблон для полей меток времени в схеме известен), чтобы они были преобразованы в правильные метки времени миллисекунды? Может быть, какая-то замена значения поля (подстрока из 13 цифр из 19-значного значения) в содержимом файла потока Avro на основе его схемы, которая встроена / хранится в атрибуте avro.schema?

Пожалуйста, дайте мне знать, если что-то неясно, и если требуются дополнительные сведения. Заранее большое спасибо!


person Alexey Chibisov    schedule 15.07.2019    source источник
comment
проблема возникает, когда вы пытаетесь преобразовать строку (с датой в наносекундах) в дату в следующем формате: whatever.SSSSSS. Джува могла вернуть за это неожиданные милли. в своем выражении лица вы просто обрезаете последние три цифры. Итак, просто используйте подстроку или замените ...   -  person daggett    schedule 15.07.2019
comment
@daggett проблема в том, что я могу изменить схему Avro logicType для известного шаблона поля временной метки, но я не понимаю, почему мне нужно сначала преобразовать тип в timestamp-millis или timestamp-micros (только эти логические типы есть в Avro, нет timestamp-nanos), и только тогда я могу использовать шаблон с 5-символьным номером года, чтобы пересчитать дату и время, преобразовав их обратно в длинные целые числа, подстроку из 13 цифр, преобразовать обратно в datetime.   -  person Alexey Chibisov    schedule 15.07.2019


Ответы (1)


В нашем случае сработало следующее решение - сценарий Groovy, который преобразует один файл avro в другой (как схему, так и содержимое):

@Grab('org.apache.avro:avro:1.8.2')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*

//function which is traversing through all records (including nested ones)
def convertAvroNanosecToMillisec(record){
    record.getSchema().getFields().forEach{ Schema.Field field -> 
        if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){
            convertAvroNanosecToMillisec(record.get(field.name()))
        }

        if (field.schema().getType().getName() == "union"){
            field.schema().getTypes().forEach{ Schema unionTypeSchema ->
                if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                    record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                    unionTypeSchema.addProp("logicalType", "timestamp-millis")
                }
            }
        } else {
            if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                field.schema().addProp("logicalType", "timestamp-millis")
            }
        }

    } 
    return record
}

//start flowfile processing
def flowFile = session.get()
if(!flowFile) return

try {

flowFile = session.write(flowFile, {inStream, outStream ->
  // Defining avro reader and writer
  DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
  DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())

  def contentSchema = reader.schema //source Avro schema
  def records = [] //list will be used to temporary store the processed records

  //reading all records from incoming file and adding to the temporary list
  reader.forEach{ GenericRecord contentRecord -> 
      records.add(convertAvroNanosecToMillisec(contentRecord))
  }

  //creating a file writer object with adjusted schema
  writer.create(contentSchema, outStream)

  //adding records to the output file from the temporary list and closing the writer
  records.forEach{ GenericRecord contentRecord -> 
      writer.append(contentRecord)
  }

  writer.close()

} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error('Error appending new record to avro file', e)
   flowFile = session.penalize(flowFile)
   session.transfer(flowFile, REL_FAILURE)
}
person Alexey Chibisov    schedule 18.07.2019