Включение @KafkaListener для получения имен переменных тем из файла application.yml

Я пытаюсь загрузить несколько тем в один @KafkaListener, но у меня возникают проблемы, так как я считаю, что он ищет постоянное значение, но инициализация переменной topics из файла application.yml вызывает какие-то проблемы, мне было интересно, может ли кто-нибудь помочь мне устранить неполадки решить эту проблему или дать мне указания, как загрузить несколько тем Kafka в один KafkaListener.

Я могу прослушивать несколько тем в одном и том же @KafkaListener, передавая их в объекте с разделителями-запятыми, как показано ниже:

@KafkaListener(topics = {
           "flight-events",
           "flight-time-events",
           "service-events",
           "flight-delay-events"
   })

Я понимаю, что мог бы создать объект со значениями, разделенными запятыми, представляющими темы, но я хочу иметь возможность добавлять темы через файл конфигурации, а не изменять код в базе кода.

Я считаю, что может быть проблема в том, что @KafkaListener должен принимать постоянное значение, и я не могу определить аннотацию как константу, есть ли способ обойти это?

KafkaWebSocketConnector.java

@Component
public class KafkaWebSocketConnector
{


   @Value("${spring.kafka.topics}")
   private String[] topics;

   @KafkaListener(topics = topics)
   public void listen(ConsumerRecord<?, Map<String, String>> message)
   {
      log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
      String dest = "/" + message.topic();
      log.info("destination = {}", dest);
      log.info("msg: {}", message);
      messageTemplate.convertAndSend(dest, message.value());
   }
}

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: kafka-websocket-connector
    topics: flight-events,
      flight-time-events,
      canceled-events,
      pax-events,
      flight-delay-events

person terrabl    schedule 05.07.2017    source источник


Ответы (1)


Ответ предоставлен @Gary Russell из этого выпуска GitHub:

https://github.com/spring-projects/spring-kafka/issues/361

Вы можете использовать выражение SpEL; есть пример в EnableKafkaIntegrationTests...

@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

В моем случае "#{'${spring.kafka.topics}'.split(',')}"

Мне удалось реализовать приведенный выше код (предоставленный Гэри Расселом), чтобы ответить на вышеуказанный вопрос.

person terrabl    schedule 05.07.2017
comment
Это называется плагиат... Вы не должны создавать выпуск GH для вопросов, пожалуйста. - person Artem Bilan; 05.07.2017
comment
Извините, Артем, я знаю, что вы работаете в команде тегов с Гэри, есть ли лучшая цитата, которую я могу применить к этому, чтобы решить вашу проблему с этим ответом? - person terrabl; 05.07.2017
comment
Вы должны добавить ссылку на эту проблему GH для получения дополнительной информации и принять свой ответ. В будущем просто имейте в виду, что мы можем быть недоступны онлайн, чтобы ответить на вопросы о GH. Вот почему SO — лучшее место для начала. - person Artem Bilan; 05.07.2017
comment
Спасибо, Артем, я понимаю, и, надеюсь, изменения, которые я внес в ответ, облегчат вашу первоначальную проблему. Что касается принятия ответа, SO не позволяет мне принять мой собственный ответ на мой собственный вопрос в течение 48 часов, но я приму ответ как можно скорее. - person terrabl; 05.07.2017