Канал ошибок Spring Cloud Stream Kafka

Я пытаюсь настроить привязку для пересылки сообщений Kafka из Spring Integration errorChannel в настраиваемый канал (для централизованной обработки ошибок).

Сообщения об ошибках отправляются на настроенный канал, но они прибывают как GenericMessage с полезной нагрузкой byte [], которая состоит из сведений об исключении и сообщения об ошибке.

Моя конфигурация:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          accountOut.producer:
            sync: true
        binder:
          autoCreateTopics: false
          headers:
               - spanId
               - spanTraceId
               - spanSampled
               - spanParentSpanId
               - spanName
               - spanFlags
               - eventType
               - Authorization
      bindings:
        error:
          destination: test-error
        accountOut:
          producer.partitionKeyExpression: payload.key
          content-type: application/json
          destination: account
  kafka:
    producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
    consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer

Я слушаю с @StreamListener(target = "kieran-error"), а потребитель настроен с @Input("kieran-error") SubscribableChannel

Чтение документов , Я ожидал, что сообщение придет как и ErrorMessage. Есть ли способ добиться этого? Или настроить доставку полезной нагрузки в виде объекта?

Версии, которые я использую:

  • Весенняя загрузка 1.5.8
  • Весеннее облако Edgware
  • Kafka 11 клиент
  • Ядро Spring Integration 4.3.12

--- Обновление вопроса ---

Теперь я понимаю, что могу настроить Spring Integration для пересылки в тему Kafka, слушая errorChannel, например.

@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    return handler;
}

Но можно ли настроить этот поток в свойствах yaml, а не в коде? Вот где вся остальная конфигурация Kafka, поэтому настройка шаблона kafka в коде не идеальна.

Другой вариант - явно прослушивать ErrorMessage и отправлять его на выходной канал kafka в коде:

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
    outputChannel.kieranError().send(...)
}

person Kieran    schedule 09.03.2018    source источник


Ответы (1)


Где именно вы читаете такое сообщение?

Описываемое вами сообщение звучит как сообщение, отправленное в тему DLQ, когда enableDlq истинно для потребителя; вы не показываете конфигурацию потребителя, поэтому мне трудно догадаться.

ErrorMessage, отправленный в канал ошибок конкретного места назначения (и связанный мостом с глобальным errorChannel, может быть использован с

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
    ...
}

В

error:
  destination:

является устаревшим и был предназначен для пользовательского кода для отправки сообщений errorChannel, которые будут переходить в эту тему.

person Gary Russell    schedule 09.03.2018
comment
Спасибо за ответ, Гэри, это помогло мне понять. Я обновил вопрос, так как теперь мне интересно, можно ли добавить конфигурацию для пересылки сообщений из errorChannel в тему kafka в Yaml, а не с ServiceActivator bean. - person Kieran; 09.03.2018
comment
Проблема в том, что ErrorMessage - сложный объект, который нельзя просто отправить в тему кафка; для этого потребуется некоторая сериализация. Он имеет различные компоненты, originalMessage, которое представляет собой исходное сообщение, преобразованное из необработанных данных kafka, необработанную запись, полезную нагрузку исключения, которая имеет failedMessage в точке сбоя (которая может быть ниже по потоку от @StreamListener) и cause. Итак, вашему приложению действительно нужно решить, что и как отправлять. По этой причине альтернативой является настройка связывателя для отправки необработанных данных (и некоторых дополнительных заголовков) в DLQ. - person Gary Russell; 09.03.2018