Spring Embedded Kafka + Mock Schema Registry: схема журнала изменений хранилища состояний не зарегистрирована

Я создаю интеграционный тест для нашей системы kafka, используя Spring Встроенный брокер Kafka с MockSchemaRegistryClient. Я создаю тест для одной из наших топологий Stream, построенный с использованием Streams API (KStreamBuilder). В этой конкретной топологии KStream (stream1) подается в KTable (table1).

Я сталкиваюсь с ошибкой, когда подаю ввод в stream1, исходящий из KTableProcessor table1:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
**Caused by: java.io.IOException: Cannot get schema from schema registry!**
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is: **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**

As the exception states, the schema has not been registered. However, the topic **appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key** does have a registered schema (registered when the entry's key is serialized for the query). 

Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?

Заранее спасибо!


person Freestyle076    schedule 18.06.2018    source источник


Ответы (1)


Решив проблему, я теперь робко расскажу: при использовании KTable (через Streams API) со встроенным брокером kafka вам нужно настроить объект KafkaStreams с каталогом State Store, уникальным для каждого запуска встроенного брокера kafka ( в моем случае каждый запуск теста).

Вы управляете каталогом State Store через конфигурацию StreamsConfig.STATE_DIR_CONFIG. Я сделал его уникальным, добавив метку времени в каталог хранилища состояний по умолчанию.

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

Проблема заключалась в том, что при каждой инициализации встроенного брокера kafka в одном и том же месте существовало старое хранилище состояний. Когда самая первая запись была введена в тему KTable, хранилище состояний смогло вернуть предыдущее значение. Это привело к попытке десериализации записи хранилища состояний, которая еще не была сериализована (с точки зрения экземпляра реестра схемы). Схемы регистрируются только при сериализации, поэтому попытка десериализации не удалась из-за отсутствия зарегистрированной схемы.

person Freestyle076    schedule 19.06.2018
comment
Я сделал, как вы предложили, однако я все еще получаю то же исключение. Я проверил свой STATE_DIR_CONFIG, он работает динамически (с отметкой времени). Я даже удаляю все предыдущие состояния, но мне это не помогло. - person Dave; 14.04.2020
comment
Это не работает в окнах ... /tmp там не существует stackoverflow.com/questions/617414/ - person OneCricketeer; 06.05.2020