Ошибка dir io состояния Kafka-streams

Ниже выдается ошибка после прохождения потока в течение определенного времени? Я не могу найти, кто отвечает за создание файла .sst?

Env:

Kafka версии 0.10.0-cp1

Scala 2.11.8

    org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
        at org.rocksdb.RocksDB.flush(Native Method)
        at org.rocksdb.RocksDB.flush(RocksDB.java:1329)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422)
        ... 9 more
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:452)
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushCache(RocksDBStore.java:379)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:411)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:248)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:228)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
        at org.rocksdb.RocksDB.write0(Native Method)
        at org.rocksdb.RocksDB.write(RocksDB.java:546)
        at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:322)
        ... 9 more

person Rahul Shukla    schedule 24.06.2016    source источник


Ответы (1)


RocksDB используется внутри Kafka Streams для обработки состояния оператора, а RocksDB записывает некоторые файлы на диск.

Возможно ли, что кто-то удалил файлы в папке /tmp и таким образом удалил состояние вашего приложения Kafka Streams? Если да, настройте другое местоположение государственного магазина с помощью параметра state.dir (см. http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters)

person Matthias J. Sax    schedule 24.06.2016
comment
Да, похоже, этим все сказано: Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory - person Dmitry Minkovsky; 30.06.2016
comment
@ rahul-shukla Это ответ на ваш вопрос? Если да, не стесняйтесь принимать и / или голосовать. - person Matthias J. Sax; 30.08.2016
comment
@ matthias-j-sax Не могли бы вы прояснить, что происходит, если при перезапуске состояние каталога очищается? Приводит ли это к непоследовательному поведению приложения потоков kafka при следующем запуске? Должен ли застройщик бережно сохранять состояние? Я не нашел упоминания об этом в документации Confluent. - person Oleg; 24.12.2017
comment
По умолчанию состояние также поддерживается темой журнала изменений в кластере Kafka. Локальный RocksDB по сути является кешем - если RocksDB утерян, Kafka Streams может воссоздать состояние из темы журнала изменений: docs.confluent.io/current/streams/ - person Matthias J. Sax; 24.12.2017