Я пытаюсь загрузить несколько тем в один @KafkaListener
, но у меня возникают проблемы, так как я считаю, что он ищет постоянное значение, но инициализация переменной topics
из файла application.yml
вызывает какие-то проблемы, мне было интересно, может ли кто-нибудь помочь мне устранить неполадки решить эту проблему или дать мне указания, как загрузить несколько тем Kafka в один KafkaListener.
Я могу прослушивать несколько тем в одном и том же @KafkaListener
, передавая их в объекте с разделителями-запятыми, как показано ниже:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
Я понимаю, что мог бы создать объект со значениями, разделенными запятыми, представляющими темы, но я хочу иметь возможность добавлять темы через файл конфигурации, а не изменять код в базе кода.
Я считаю, что может быть проблема в том, что @KafkaListener должен принимать постоянное значение, и я не могу определить аннотацию как константу, есть ли способ обойти это?
KafkaWebSocketConnector.java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events,
canceled-events,
pax-events,
flight-delay-events