Как отправлять сообщения на два разных канала вывода в SCDF?

У меня есть источник, который отправляет сообщения по каналу output по умолчанию процессору в потоке. Теперь я хочу также отправлять сообщения об ошибках по другому каналу.

Я решил, что мне нужно создать привязываемый интерфейс, который простирается от Source и добавляет дополнительный канал с помощью @Output. Как мне убедиться, что SCDF действительно создает тему Kafka для этого канала? IOW, как бы выглядело определение потока?

Например. что-то вроде

source | processor | sink source > error-sink

При source | processor используется обычный output канал / тема Kafka и source > error-sink используется другой канал / тема.


person Remon Sinnema    schedule 07.03.2017    source источник


Ответы (1)


Если требуется отслеживать сообщения об ошибках для последующей обработки, вы можете использовать механизм OOTB DLQ, связанный с Spring Cloud Stream. Он поддерживается как в Rabbit и Kafka. Вы можете включить DLQ в Spring Cloud Data Flow (SCDF) как глобальная настройка или для каждого потока.

Если вы по-прежнему хотите определить свои клиентские каналы для другой обработки сообщений, вам придется создать настраиваемый интерфейс, подобный этому образец.

При развертывании потока в SCDF вы можете затем переопределить назначения между производителем и потребителем с помощью свойств привязки spring.cloud.stream.kafka.bindings.<channelName>.producer и spring.cloud.stream.kafka.bindings.<channelName>.consumer соответственно.

РЕДАКТИРОВАТЬ:

Хотя существует описанный выше подход, я узнал о гораздо более простом решении от руководителя Spring Cloud Stream (@ marius-bogoevici).

Уже существует канал ошибок по умолчанию, доступный для использования, и Spring Integration поддерживает его.

Благодаря этому в вашем приложении вы можете отправлять пользовательские сообщения в канал ошибок по умолчанию через: @Autowire @Qualifier("errorChannel"). Фактически, эта поддержка также доступна для всех приложений OOTB.

Затем вы можете переопределить назначение этого канала ошибки с помощью: spring.cloud.stream.bindings.error.destination=errorchannel-test. В SCDF вы должны передать это во время развертывания потока с помощью: --properties.

Например:

поток create foo --definition mysource | бревно

поток развертывания foo --properties app.mysource.spring.cloud.stream.bindings.error.destination = errorchannel-test

person Sabby Anandan    schedule 07.03.2017
comment
Спасибо, Сабби. Я действительно хочу обрабатывать сообщения об ошибках ниже по потоку, но не тем же процессором, что и обычные сообщения. Кроме того, сообщения об ошибках являются настраиваемыми сообщениями. Так что, похоже, DLQ не подходит, не так ли? Пример полезен для подключения кода Java к каналам. Однако я борюсь с тем, как разрешить SCDF подключать каналы к темам Kafka. Могу ли я просто создать еще один поток от источника к обработчику ошибок? - person Remon Sinnema; 07.03.2017
comment
Оказывается, вам не нужно ничего делать в определении потока, чтобы это произошло. В потоковых приложениях просто используйте @EnableBinding с интерфейсом, который добавляет еще один канал, как предлагал Сабби. Для источника это должен быть MessageChannel. Для процессора / приемника это должен быть SubscribableChannel. Spring Cloud Stream позаботится о сопоставлении каналов с темами Kafka; все, что вам нужно сделать, это убедиться, что источник и процессор / приемник используют одно и то же имя для канала. Нет необходимости переопределять назначение через свойства. - person Remon Sinnema; 09.03.2017