У меня было существующее приложение Kafka Streams, которое отлично работало с 0.10.1.1. Обновлена новая библиотека Kafka Streams 0.10.2.0 вместе с новым брокером (хотя новая библиотека обратно совместима с 0.10.1.1). Быстрый фон
- У меня есть REST API, построенный поверх интерактивного API на основе метаданных запросов.
- Он выполняет поиск в локальном хранилище, а также запрашивает удаленное хранилище (используя StreamsMetadata, полученные методом KafkaStreams.allMetadataForStore).
- Используйте параметр конфигурации application.server, чтобы это работало.
Приложение работает нормально с одним экземпляром приложения. Как только я запускаю другой экземпляр и запрашиваю хранилище через REST, у меня возникают следующие проблемы.
Если я выполняю поиск на узле, который был запущен первым, поиск в локальном хранилище не удался с этим исключением
SEVERE: Error - the state store, my-store, may have migrated to another instance.
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, in-memory-avg-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
**at mycode.getLocalMetrics(myclass.java:121)**
**at mycode.remote(myclass.java:98)**
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
Если я выполняю поиск на новом узле, поиск в локальном хранилище прошел нормально, но я вижу нулевой указатель в строке .forEach(new Consumer()
ks.allMetadataForStore(storeName)
.stream()
.filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores
.forEach(new Consumer<StreamsMetadata>() {
@Override
public void accept(StreamsMetadata t) {
//some logic
}
});
Что я мог упустить??