Spring Cloud Stream Kafka Binder enableDlq не работает так, как я ожидал

Я использую связыватель kafka spring-cloud-stream с реестром схем (не потоки kakfa). Я пытаюсь сделать, когда недесериализуемое сообщение попало во входную тему, отправить недесериализуемое сообщение в dlq.

Итак, я попробовал, как показано ниже, но приложение Spring Cloud Stream продолжает повторять попытки бесконечно и сообщает

Вызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения Avro для идентификатора -1

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
  kafka:
    binder:
      brokers: localhost:9092
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: input-topic-dlq
          autoCommitOnError: true
          autoCommitOffset: true
    default:
      consumer:
        configuration:
          schema.registry.url: http://localhost:8081
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

Что я делаю неправильно? Пожалуйста помоги.




Ответы (1)


Пожалуйста, не задавайте один и тот же вопрос в нескольких местах, это пустая трата вашего и нашего времени; как я уже ответил на Gitter:

Эта ошибка возникает слишком далеко в стеке, и spring-cloud-stream не может с ней помочь. Вам нужно использовать ListenerContainerCustomizer @Bean, чтобы настроить SeekToCurrentErrorHandler с DeadLetterPublishingRecoverer и настроить ErrorHandlingDeserializer.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

Тем не менее, Stack Overflow - лучшее средство для ответов на подобные вопросы.

person Gary Russell    schedule 18.02.2021