Проблемы с приложением Kafka Streams в версии 0.10.2.0

У меня было существующее приложение Kafka Streams, которое отлично работало с 0.10.1.1. Обновлена ​​новая библиотека Kafka Streams 0.10.2.0 вместе с новым брокером (хотя новая библиотека обратно совместима с 0.10.1.1). Быстрый фон

  • У меня есть REST API, построенный поверх интерактивного API на основе метаданных запросов.
  • Он выполняет поиск в локальном хранилище, а также запрашивает удаленное хранилище (используя StreamsMetadata, полученные методом KafkaStreams.allMetadataForStore).
  • Используйте параметр конфигурации application.server, чтобы это работало.

Приложение работает нормально с одним экземпляром приложения. Как только я запускаю другой экземпляр и запрашиваю хранилище через REST, у меня возникают следующие проблемы.

Если я выполняю поиск на узле, который был запущен первым, поиск в локальном хранилище не удался с этим исключением

SEVERE: Error - the state store, my-store, may have migrated to another instance.
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance.
        at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
        at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
        at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
        **at mycode.getLocalMetrics(myclass.java:121)**
        **at mycode.remote(myclass.java:98)**
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)

Если я выполняю поиск на новом узле, поиск в локальном хранилище прошел нормально, но я вижу нулевой указатель в строке .forEach(new Consumer()

ks.allMetadataForStore(storeName)
                    .stream()
                    .filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores
                    .forEach(new Consumer<StreamsMetadata>() {
                        @Override
                        public void accept(StreamsMetadata t) {
                            //some logic
                        }

                    });

Что я мог упустить??


person Abhishek    schedule 05.03.2017    source источник
comment
К вашему сведению: см. запись часто задаваемых вопросов по адресу docs.confluent.io/current/streams/ для получения более подробной информации и информации о том, как решить проблему.   -  person Michael G. Noll    schedule 12.04.2017


Ответы (3)


Возможно, вам нужно добавить немного «времени сна» для создания StreamsMetadata. Когда я комментирую Thread.sleep(1000L);, у меня возникает та же ошибка.

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    Thread.sleep(1000L);//If commented out,error occur 
    System.out.println(streams.allMetadataForStore("statestore").size());
person Breath Zyl    schedule 29.08.2017

Если вы запускаете несколько экземпляров, магазин может мигрировать с одного экземпляра на другой. Если вы соберете метаданные о расположении магазина до переноса магазина, ваши метаданные больше не будут правильными. Таким образом, вам необходимо обновить свои метаданные (т.е. собрать их заново).

Исключение org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance указывает на то, что ваши метаданные застопорились.

Таким образом, вы можете поймать это исключение и повторить попытку восстановления после него.

person Matthias J. Sax    schedule 05.03.2017
comment
Да. Я видел это раньше и знаю, что это происходит во время миграции магазина. Но дело в том, что с 0.10.2.0 эта миграция кажется бесконечной, т.е. как только стартует второй узел, приложение переходит в это состояние. Кроме того, как я уже упоминал, все отлично работает с версией 0.10.1.1 - код не меняется вообще. Именно поэтому я немного озадачен. - person Abhishek; 06.03.2017
comment
О чем вам говорят журналы инстансов? Приложение перебалансируется? - person Damian Guy; 07.03.2017
comment
Можете ли вы использовать KafkaStreams.state(), чтобы увидеть, завершена ли перебалансировка и в каком состоянии находятся оба ваших приложения, когда вы пытаетесь выполнить запрос? - person Matthias J. Sax; 08.03.2017

Основываясь на комментариях Matthias J. Sax, можно загрузить хранилище потоков внутри прослушивателя состояния потока, как показано ниже.

streams.setStateListener((newState, oldState) -> {
   if (newState == State.RUNNING && oldState.REBALNCING) {
      System.out.println(streams.allMetadataForStore("statestore").size());
   }});
person Arun Y    schedule 21.02.2018