Извлеките из ElasticSearch в Kafka все новые обновления ES, используя logstash.

У меня есть кластер ES с несколькими индексами, которые получают обновления через случайные промежутки времени. У меня есть экземпляр logstash, извлекающий данные из ES и передающий их в Kafka.

Что было бы хорошим способом запускать это каждую минуту и ​​получать любые обновления в ES?

Конф:

 input {
   elasticsearch {
     hosts => [ "hostname1.com:5432", "hostname2.com" ]
     index => "myindex-*"
     query => "*"
     size => 10000
     scroll => "5m"
   }
 }
 output {
   kafka {
     bootstrap-servers => "abc-kafka.com:1234"
     topic_id => "my.topic.test"
   }
 }

Я хотел бы использовать документы @timestamp в запросе и сохранить его во временном файле, затем повторно запустить запрос (с расписанием) и получить последние обновления/вставить (что-то вроде того, что плагин jdbc-input поддерживает logstash)

Любые идеи?

заранее спасибо


person E.P.    schedule 09.03.2016    source источник


Ответы (1)


Кто-то задал то же самое несколько месяцев назад, но эта проблема не не получить много трафика. Вы можете +1, может быть.

Тем временем вы можете изменить query в своем вводе elasticsearch следующим образом:

query => '{"query":{"range":{"timestamp":{"gt": "now-1m"}}}}'

т. е. вы запрашиваете все документы, чье поле timestamp (произвольное имя, измените, чтобы оно соответствовало вашему) было в течение последней минуты

Затем вам нужно настроить cron, который будет запускать ваш процесс logstash каждую минуту. Теперь из-за задержки между моментом запуска cron, моментом запуска logstash и моментом поступления запроса на сторону сервера ES, просто знайте, что 1m может быть недостаточно, и вы рискуете пропустить некоторые документы. Вам нужно проверить это и выяснить, что лучше.

Согласно этому недавнему сообщению в блоге, другим способом может быть запись последний раз, когда Logstash запускался в переменных среды LAST_RUN и использовал эту переменную в запросе:

query => '{"query":{"range":{"timestamp":{"gt": "${LAST_RUN}"}}}}'

В этом сценарии вы должны создать сценарий оболочки, который запускается cron и делает в основном следующее:

  1. беги logstash -f your_config_file.conf
  2. когда закончите, установите LAST_RUN=$(date +"%FT%T")
person Val    schedule 09.03.2016