Я пытаюсь настроить привязку для пересылки сообщений Kafka из Spring Integration errorChannel в настраиваемый канал (для централизованной обработки ошибок).
Сообщения об ошибках отправляются на настроенный канал, но они прибывают как GenericMessage
с полезной нагрузкой byte [], которая состоит из сведений об исключении и сообщения об ошибке.
Моя конфигурация:
spring:
cloud:
stream:
kafka:
bindings:
accountOut.producer:
sync: true
binder:
autoCreateTopics: false
headers:
- spanId
- spanTraceId
- spanSampled
- spanParentSpanId
- spanName
- spanFlags
- eventType
- Authorization
bindings:
error:
destination: test-error
accountOut:
producer.partitionKeyExpression: payload.key
content-type: application/json
destination: account
kafka:
producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
Я слушаю с @StreamListener(target = "kieran-error")
, а потребитель настроен с @Input("kieran-error") SubscribableChannel
Чтение документов , Я ожидал, что сообщение придет как и ErrorMessage
. Есть ли способ добиться этого? Или настроить доставку полезной нагрузки в виде объекта?
Версии, которые я использую:
- Весенняя загрузка 1.5.8
- Весеннее облако Edgware
- Kafka 11 клиент
- Ядро Spring Integration 4.3.12
--- Обновление вопроса ---
Теперь я понимаю, что могу настроить Spring Integration для пересылки в тему Kafka, слушая errorChannel
, например.
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}
Но можно ли настроить этот поток в свойствах yaml, а не в коде? Вот где вся остальная конфигурация Kafka, поэтому настройка шаблона kafka в коде не идеальна.
Другой вариант - явно прослушивать ErrorMessage и отправлять его на выходной канал kafka в коде:
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
outputChannel.kieranError().send(...)
}