Приложение Spring Cloud Stream завершает работу после одного ввода мусора

У меня проблема с приложением облачного потока Spring, которое использует компонент KStream. Он прослушивает один вход и направляет сообщения на один выход после их обработки.

Он ожидает поступления строки JSON и пытается преобразовать ее в Spring Tuple по прибытии. При отправке сообщения происходит обратное.

Проблема в том, что когда системный администратор хочет протестировать тему, например, с kafka-console-producer.sh ... и печатает строку

"ржу не могу"

в нем все приложение Spring Cloud Stream умрет прямо здесь, за следующим исключением:

java.lang.RuntimeException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
 at [Source: lol; line: 1, column: 7]
    at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:71) ~[spring-tuple-1.0.0.RELEASE.jar:na]
    at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:31) ~[spring-tuple-1.0.0.RELEASE.jar:na]
    at org.springframework.tuple.TupleBuilder.fromString(TupleBuilder.java:153) ~[spring-tuple-1.0.0.RELEASE.jar:na]
    at org.springframework.cloud.stream.converter.TupleJsonMessageConverter.convertFromInternal(TupleJsonMessageConverter.java:90) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:167) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:55) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
    at org.springframework.cloud.stream.binder.kstream.KStreamListenerParameterAdapter$1.apply(KStreamListenerParameterAdapter.java:66) ~[spring-cloud-stream-binder-kstream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) ~[kafka-streams-0.10.1.1.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
 at [Source: lol; line: 1, column: 7]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2839) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) ~[jackson-core-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3850) ~[jackson-databind-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3799) ~[jackson-databind-2.8.10.jar:2.8.10]
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2397) ~[jackson-databind-2.8.10.jar:2.8.10]
    at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:44) ~[spring-tuple-1.0.0.RELEASE.jar:na]

Я ожидал, что у фреймворка есть хоть какая-то отказоустойчивость для такого поведения. Вы не можете ожидать, что ввод всегда будет красивым и красивым. Поэтому я просмотрел документацию Spring: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_configuration_options

и есть несколько вариантов конфигурации для того, что кажется некоторой скрытой реализацией логики повторных попыток в случае сбоев. Например, параметр maxAttempts. Но для этого параметра уже используется значение по умолчанию 3, и все же я не вижу, чтобы приложения Spring Cloud Stream предпринимали какие-либо попытки спастись от этой ошибки.

Поэтому я хотел бы знать, каков рекомендуемый способ создания некоторой плохой входной толерантности для облачных потоковых приложений Spring.

Конфигурация приложения выглядит так:

spring:
    cloud:
        stream:
          bindings:
            input:
              content-type: application/json
              destination: inbound
              group: fraud
              consumer:
                headerMode: raw
            output:
              content-type: application/x-spring-tuple
              destination: outbound
              producer:
                headerMode: raw
                useNativeEncoding: true

spring.cloud.stream.kstream.binder.configuration:
  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

person Kaspar    schedule 13.02.2018    source источник


Ответы (1)


В Spring Cloud Stream 1.3.x (Ditmars) есть только очень ограниченная поддержка обработки ошибок для Kafka Streams. Фактически, приложение должно обрабатывать любые ошибки в библиотеке 1.3 kafka streams. Однако в 2.0.0 мы добавляем поддержку KIP-161. https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers

Используя эту новую функцию в версии 2.0.0 связывателя потоков kafka, вы можете либо logAndSkip записи, либо logAndFail записи об ошибках десериализации. В дополнение к этому связыватель также предоставляет реализацию обработчика исключений отправки DLQ. Документы по версии 2.0 все еще обновляются. Я обновлю ссылки на документы здесь, когда они будут готовы. Но вот суть.

spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq (или logAndFail или logAndSkip)

spring.cloud.stream.kafka.stream.bindings.input.consumer.dlqName:[dlq name] - Если это не предусмотрено, это будет error.[incoming-topic].[group-name].

Тогда вы увидите записи об ошибках десериализации в теме DLQ. И снова эти функции доступны только в версии 2.0.0.BUILD-SNAPSHOT и станут частью предстоящего 2.0.0.RC1 выпуска.

person sobychacko    schedule 13.02.2018
comment
Спасибо за хороший ответ! Не могли бы вы добавить приблизительную дату для RC 2.0.0? Придется принять решение о реализации собственной обработки или использовании новейшего кода. - person Kaspar; 13.02.2018
comment
2.0.0.RC1 должен выйти на следующей неделе. - person sobychacko; 14.02.2018