Как заставить Spring Cloud Stream связыватель потоков Kafka повторить попытку обработки сообщения, если на этапе обработки произошел сбой?

Я работаю над Kafka Streams, используя Spring Cloud Stream. В приложении для обработки сообщений может быть вероятность, что оно выдаст ошибку. Таким образом, сообщение не должно повторяться и повторяться снова.

Мой метод применения -

@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));

dao.insert(wc);

return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}

Здесь, если метод вставки DAO завершается неудачно, сообщение не должно публиковаться в выходной теме, и следует повторить обработку того же сообщения.

Как мы можем настроить связыватель потоков kafka для этого? Любая помощь по этому поводу очень ценится.




Ответы (1)


Сам связыватель Spring Cloud Stream Kafka Streams не предоставляет такие механизмы повторной попытки в рамках выполнения вашей бизнес-логики. Однако один из способов решения этого варианта использования может заключаться в том, чтобы заключить ваш критический вызов (dao.insert() в данном случае) в RetryTemplate, который вы определяете локально. Вот возможная реализация, которая повторяет 10 попыток с политикой отсрочки в 1 секунду. Если вы пробуете это решение, обязательно извлеките общий код, связанный с RetryTemplate, из основной бизнес-логики. Я не пробовал, но должно работать.

KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
  WordCount wc = new WordCount();
  ...

  org.springframework.retry.support.RetryTemplate retryTemplate = new 
   RetryTemplate();

  RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
  FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
  backOffPolicy.setBackOffPeriod(1000);

  retryTemplate.setBackOffPolicy(backOffPolicy);
  retryTemplate.setRetryPolicy(retryPolicy);

  retryTemplate.execute(context -> {
    try {
      dao.insert(wc);
    }
    catch (Exception e) {
      throw new IllegalStateException(..);
   }
  });

  return new KeyValue<>(k.key(),wc);
});

Событие после 10 повторных попыток операции вставки dao, если она все еще не удалась, будет сгенерировано исключение, которое завершит работу приложения, и в этом случае смещение не будет зафиксировано. При перезапуске, после устранения основной проблемы, ваше приложение должно продолжать работу с этого смещения.

person sobychacko    schedule 03.06.2020
comment
Привет, что это за объект контекста в методе выполнения RetryTemplate? Как его создать? - person shreyas.k; 04.06.2020
comment
Это RetryContext, который создается и передается фреймворком. Мы просто используем это в приложении как лямбда-аргумент. Вам не нужно явно создавать это, фреймворк позаботится об этом. - person sobychacko; 04.06.2020