как использовать просроченный фильтр - logstash

Я работаю в просроченном фильтре. Я прочитал руководство по просроченному фильтру в logstash. затем я сделал образец файла конфигурации и csv, чтобы проверить работу просроченного фильтра. Но, похоже, это не работает. Нет никаких изменений в загрузке данных в ES. Я прикрепил файл csv и код конфигурации. Можете ли вы привести несколько примеров того, как использовать просроченный фильтр.

Вот мои данные csv: образец данных CSV

вот мой файл конфигурации:

input {
     file {
      path => "/home/paulsteven/log_cars/aggreagate.csv"
      start_position => "beginning"
      sincedb_path => "/dev/null"
   }
}
filter {
    csv {
        separator => ","
        quote_char => "%"
        columns => ["state","city","haps","ads","num_id","serial"]
    }
    elapsed {
        start_tag => "taskStarted"
        end_tag => "taskEnded"
        unique_id_field => "num_id"
    }

}
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "el03"
    document_type => "details"
  }
  stdout{}
}

Вывод в ЕС:

{
          "city" => "tirunelveli",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "tamil nadu,tirunelveli,hap0,ad1,2345-1002-4501,1",
      "@version" => "1",
        "serial" => "1",
          "haps" => "hap0",
         "state" => "tamil nadu",
          "host" => "smackcoders",
           "ads" => "ad1",
    "@timestamp" => 2019-05-06T10:03:51.443Z
}
{
          "city" => "chennai",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "tamil nadu,chennai,hap0,ad1,2345-1002-4501,5",
      "@version" => "1",
        "serial" => "5",
          "haps" => "hap0",
         "state" => "tamil nadu",
          "host" => "smackcoders",
           "ads" => "ad1",
    "@timestamp" => 2019-05-06T10:03:51.447Z
}
{
          "city" => "kottayam",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "kerala,kottayam,hap1,ad2,2345-1002-4501,9",
      "@version" => "1",
        "serial" => "9",
          "haps" => "hap1",
         "state" => "kerala",
          "host" => "smackcoders",
           "ads" => "ad2",
    "@timestamp" => 2019-05-06T10:03:51.449Z
}
{
          "city" => "Jalna",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "mumbai,Jalna,hap2,ad3,2345-1002-4501,13",
      "@version" => "1",
        "serial" => "13",
          "haps" => "hap2",
         "state" => "mumbai",
          "host" => "smackcoders",
           "ads" => "ad3",
    "@timestamp" => 2019-05-06T10:03:51.452Z
}

person Smack Alpha    schedule 06.05.2019    source источник


Ответы (1)


Вы должны пометить свои события, чтобы Logstash мог найти начальные / конечные теги. По сути, вы должны знать, когда событие считается начальным, а когда конечным.

Плагин просроченного фильтра работает только для двух событий (например, события запроса и события ответа, чтобы получить задержку между ними). ​​Оба этих двух типа событий должны иметь поле идентификатора, которое однозначно идентифицирует эту конкретную задачу. Имя этого поля хранится в поле unique_id_field.

В вашем примере вам нужно определить шаблон для начального и конечного события, скажем, у вас есть в вашем csv столбец type (см. код ниже), когда type содержит " START", строка считается начальным событием, и если она содержит "END", это конечное событие, довольно простое, и столбец id, в котором хранится уникальный идентификатор.

filter {
  csv {
    separator => ","
    quote_char => "%"
    columns => ["state","city","haps","ads","num_id","serial", "type", "id"]
    }
  grok {
    match => { "type" => ".*START.*" }
    add_tag => [ "taskStarted" ]
  }grok {
  match => { "type" => ".*END*" }
  add_tag => [ "taskTerminated" ]
}  elapsed {
    start_tag => "taskStarted"
    end_tag => "taskTerminated"
    unique_id_field => "id"
  }
}

Я чувствую, что ваша потребность в другом. Если вы хотите объединить более двух событий, например, все события с одинаковым значением состояния столбца, ознакомьтесь с этот плагин

person Fares    schedule 06.05.2019
comment
Спасибо за ответ. Судя по вашему ответу, ему нужны события ответа и запроса в файле csv. Он вычисляет эти два события и присваивает результат уникальному полю идентификатора??? - person Smack Alpha; 09.05.2019
comment
Когда получено конечное событие, совпадающее с ранее собранным начальным событием, происходит совпадение. Свойство конфигурации new_event_on_match указывает, куда вставить прошедшую информацию: их можно добавить в конечное событие или создать новое событие совпадения. Плагин хорошо документирован: elastic.co/guide/ ru/logstash/current/ - person Fares; 14.05.2019