У меня есть производитель Apache Kafka 2.6, который пишет в тему A (TA). У меня также есть приложение потоковой передачи Kafka, которое потребляет из TA и записывает в тему-B (TB). В приложении потоков у меня есть настраиваемый экстрактор отметок времени, который извлекает отметку времени из полезной нагрузки сообщения.
Для одного из моих тестовых примеров обработки сбоев я выключил кластер Kafka во время работы моих приложений.
Когда приложение-производитель пытается записать сообщения в TA, оно не может, потому что кластер не работает и, следовательно, (я предполагаю) буферизует сообщения. Допустим, он получает 4 сообщения m1, m2, m3, m4 в возрастающем временном порядке. (т.е. m1 - первое, а m4 - последнее).
Когда я возвращаю кластер Kafka в оперативный режим, производитель отправляет в тему буферизованные сообщения, но они не в порядке. Я получаю, например, m2, затем m3, затем m1, а затем m4.
Это почему ? Это потому, что буферизация в производителе является многопоточной, и каждый из них работает в теме одновременно?
Я предположил, что пользовательский экстрактор временных меток поможет упорядочить сообщения при их использовании. Но они этого не делают. Или, может быть, я неправильно понимаю экстрактор временных меток.
У меня есть одно решение от SO здесь, чтобы просто передать все события из tA в другую промежуточную тему (скажем, tA '), которая будет использовать экстрактор TimeStamp в другую тему. Но я не уверен, что это приведет к переупорядочению событий на основе извлеченной отметки времени.
Мой код для продюсера показан ниже (я использую Spring Cloud для создания продюсера): Producer.java
@Service
public class Producer {
private String topicName = "input-topic";
private ApplicationProperties appProps;
@Autowired
private KafkaTemplate<String, MyEvent> kafkaTemplate;
public Producer() {
super();
}
@Autowired
public void setAppProps(ApplicationProperties appProps) {
this.appProps = appProps;
this.topicName = appProps.getInput().getTopicName();
}
public void sendMessage(String key, MyEvent ce) {
ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName, key, ce);
}
}