Исключения при запуске толстой банки потоков кафки

Я пробую пример подсчета слов для изучения потоков кафки. Ниже приведен используемый код. Я создал толстую банку из проекта и начал создавать сообщения в теме word-count-input1 и получать вывод из word-count-output1. Но когда я запускаю толстую банку, я вижу исключение - org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record.

    Properties properties = new Properties();

    properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());

    KStreamBuilder builder = new KStreamBuilder();

    // 1. Stream from kafka

    KStream<String,String> wordcountInput = builder.stream("word-count-input1");

    // 2. map values to lower case

   KTable<String, Long> wordcount = wordcountInput.mapValues(value -> value.toLowerCase())

                                   // 3. split by space on values
                                    .flatMapValues(value -> Arrays.asList(value.split(" ")))

                                   // 4. Create  a key to apply a key, so the word itself is a key

                                    .selectKey((ignoredKey,words) -> words)

                                    // 5. Group it by key

                                    .groupByKey()

                                    // 6. count occurences, add a column name - counts

                                    .count("counts");

   // Since the StreamsConfig was set to String and String, its mandatory to specify the Serdes o/p which is String and Long in our case
    wordcount.to(Serdes.String(),Serdes.Long(),"word-count-output1");

    KafkaStreams streams = new KafkaStreams(builder, properties);
    streams.start();
    System.out.println("Topology is " + streams.toString());

Исключение:

INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1040)
INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:972)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Removing all active tasks [0_0, 1_0, 0_1, 1_1, 0_2, 1_2] (org.apache.kafka.streams.processor.internals.StreamThread:1407)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Removing all standby tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1421)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1072)
WARN stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread:978)
Exception in thread "word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=word-count-input1, partition=0, offset=0
        at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] State transition from RUNNING to PENDING_SHUTDOWN. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Informed thread to shut down (org.apache.kafka.streams.processor.internals.StreamThread:900)
WARN stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Unexpected state transition from DEAD to PENDING_SHUTDOWN. (org.apache.kafka.streams.processor.internals.StreamThread:978)
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] Stopped Kafka Streams process. (org.apache.kafka.streams.KafkaStreams:514)
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] State transition from PENDING_SHUTDOWN to NOT_RUNNING. (org.apache.kafka.streams.KafkaStreams:229)

Настройка заключается в том, что я запускаю zookeeper и 3 брокера на виртуальной машине Linux. Может ли кто-нибудь предложить?


person Arun Shankar    schedule 23.08.2017    source источник
comment
какая версия кафки пожалуйста?   -  person groo    schedule 23.08.2017


Ответы (2)


Вы указали LongDeserializer для `VALUE_DESERIALIZER? Фактическая ошибка:

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

Итак, кажется, что ваше значение не является значением длиной 8 байт. Я предполагаю, что ваше значение на самом деле является строкой для входной темы? Поэтому вам нужно указать правильный десериализатор, который соответствует вашим данным.

person Matthias J. Sax    schedule 23.08.2017

Изменение вашего Ktable на это должно исправить это:

KTable<String, Long> wordcount = source
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                    }
                })
                .groupBy(new KeyValueMapper<String, String, String>() {
                    @Override
                    public String apply(String key, String value) {
                        return value;
                    }
                })
                .count("Counts");
person groo    schedule 23.08.2017