Использование поля в качестве входных данных для шаблона фильтра Logstash Grok

Мне интересно, можно ли использовать поле в сообщении Logstash в качестве ввода шаблона Grok. Скажем, у меня есть запись, которая выглядит так:

{
    "message":"10.1.1.1",
    "grok_filter":"%{IP:client}"
}

Я хочу иметь возможность сделать что-то вроде этого:

filter {
  grok {
    match => ["message", ["%{grok_filter}"]]
  }
}

Проблема в том, что это приводит к сбою Logstash, поскольку он, по-видимому, рассматривает «%{grok_filter}» как сам фильтр Grok, а не значение grok_filter. Я получаю следующее после сбоя Logstash:

The error reported is: 
  pattern %{grok_filter} not defined

Есть ли способ получить значение поля из блока фильтра Grok и использовать его в качестве входных данных для шаблона Grok?


person freb    schedule 23.10.2014    source источник


Ответы (1)


Ответ отрицательный — фильтр grok компилирует свой шаблон при инициализации фильтра. Если вам нужно сделать что-то подобное, вам придется написать свой собственный фильтр, который каждый раз компилирует шаблон (и платить штраф за производительность).

Не зная больше о том, почему вы хотите это сделать, трудно порекомендовать наилучший план действий. Если у вас ограниченное количество паттернов, вы можете просто установить параметр grok_filter_type, а затем получить кучу вещей типа if [grok_filter_type] == 'ip' { grok { ... } }.

Вот пользовательский фильтр, который позволит вам делать то, что вы хотите - это в основном копия кода grok, но есть некоторые изменения/упрощения. Я проверил это, и, похоже, это работает для меня.

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/environment"
require "set"

# A version of grok that can parse from a log-defined pattern.  Not really
# recommended for high usage patterns, but for the occassional pattern it
# should work
#     filter {
#       grok_dynamic {
#         match_field => "message"
#     pattern_field => "message_pattern"
#       }
#     }
#
class LogStash::Filters::GrokDynamic < LogStash::Filters::Base
  config_name "grok_dynamic"
  milestone 1

  # The field that contains the data to match against
  config :match_field, :validate => :string, :required => true
  # the field that contains the pattern
  config :pattern_field, :validate => :string, :required => true
  # where the patterns are
  config :patterns_dir, :validate => :array, :default => []

  # If true, only store named captures from grok.
  config :named_captures_only, :validate => :boolean, :default => true

  # If true, keep empty captures as event fields.
  config :keep_empty_captures, :validate => :boolean, :default => false

  # Append values to the 'tags' field when there has been no
  # successful match
  config :tag_on_failure, :validate => :array, :default => ["_grokparsefailure"]

  # The fields to overwrite.
  #
  # This allows you to overwrite a value in a field that already exists.
  config :overwrite, :validate => :array, :default => []

  # Detect if we are running from a jarfile, pick the right path.
  @@patterns_path ||= Set.new
  @@patterns_path += [LogStash::Environment.pattern_path("*")]

  public
  def initialize(params)
    super(params)
    @handlers = {}
  end

  public
  def register
    require "grok-pure" # rubygem 'jls-grok'

    @patternfiles = []

    # Have @@patterns_path show first. Last-in pattern definitions win; this
    # will let folks redefine built-in patterns at runtime.
    @patterns_dir = @@patterns_path.to_a + @patterns_dir
    @logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
    @patterns_dir.each do |path|
      if File.directory?(path)
        path = File.join(path, "*")
      end

      Dir.glob(path).each do |file|
        @logger.info? and @logger.info("Grok loading patterns from file", :path => file)
        @patternfiles << file
      end
    end

    @patterns = Hash.new { |h,k| h[k] = [] }

    @grok = Grok.new
    @patternfiles.each { |path| @grok.add_patterns_from_file(path) }

  end # def register

  public
  def filter(event)
    return unless filter?(event)
    return if event[@match_field].nil? || event[@pattern_field].nil?

    @logger.debug? and @logger.debug("Running grok_dynamic filter", :event => event);
    @grok.compile(event[@pattern_field]);
    if match(@grok,@match_field, event)
      filter_matched(event)
    else
      # Tag this event if we can't parse it. We can use this later to
      # reparse+reindex logs if we improve the patterns given.
      @tag_on_failure.each do |tag|
        event["tags"] ||= []
        event["tags"] << tag unless event["tags"].include?(tag)
      end
    end

    @logger.debug? and @logger.debug("Event now: ", :event => event)
  end # def filter

  private
  def match(grok, field, event)
    input = event[field]
    if input.is_a?(Array)
      success = true
      input.each do |input|
        match = grok.match(input)
        if match
          match.each_capture do |capture, value|
            handle(capture, value, event)
          end
        else
          success = false
        end
      end
      return success
    #elsif input.is_a?(String)
    else
      # Convert anything else to string (number, hash, etc)
      match = grok.match(input.to_s)
      return false if !match

      match.each_capture do |capture, value|
        handle(capture, value, event)
      end
      return true
    end
  rescue StandardError => e
    @logger.warn("Grok regexp threw exception", :exception => e.message)
  end

  private
  def handle(capture, value, event)
    handler = @handlers[capture] ||= compile_capture_handler(capture)
    return handler.call(value, event)
  end

  private
  def compile_capture_handler(capture)
    # SYNTAX:SEMANTIC:TYPE
    syntax, semantic, coerce = capture.split(":")

    # each_capture do |fullname, value|
    #   capture_handlers[fullname].call(value, event)
    # end

    code = []
    code << "# for capture #{capture}"
    code << "lambda do |value, event|"
    #code << "  p :value => value, :event => event"
    if semantic.nil?
      if @named_captures_only
        # Abort early if we are only keeping named (semantic) captures
        # and this capture has no semantic name.
        code << "  return"
      else
        field = syntax
      end
    else
      field = semantic
    end
    code << "  return if value.nil? || value.empty?" unless @keep_empty_captures
    if coerce
      case coerce
        when "int"; code << "  value = value.to_i"
        when "float"; code << "  value = value.to_f"
      end
    end

    code << "  # field: #{field}"
    if @overwrite.include?(field)
      code << "  event[field] = value"
    else
      code << "  v = event[field]"
      code << "  if v.nil?"
      code << "    event[field] = value"
      code << "  elsif v.is_a?(Array)"
      code << "    event[field] << value"
      code << "  elsif v.is_a?(String)"
      # Promote to array since we aren't overwriting.
      code << "    event[field] = [v, value]"
      code << "  end"
    end
    code << "  return"
    code << "end"

    #puts code
    return eval(code.join("\n"), binding, "<grok capture #{capture}>")
  end # def compile_capture_handler

end # class LogStash::Filters::Grok
person Alcanzar    schedule 23.10.2014
comment
Вариант использования — это решение для ведения журнала, когда клиент, отправляющий журналы, имеет возможность включать атрибут типа (используя logstash-forwarder), и поэтому я могу определить множество шаблонов grok на стороне сервера. Но если есть не очень распространенный журнал, который я хотел бы проанализировать, я бы предпочел настроить grok patern на клиенте, а не каждый раз перенастраивать и перезапускать logstash. Я не смотрел внимательно на источник Grok. Как вы думаете, я мог бы адаптировать фильтр Grok для компиляции фильтра при первом его получении, предоставив шаблон и имя отца? то есть grok_name, grok_pattern как поля? - person freb; 23.10.2014
comment
Кажется, это ваш лучший выбор... создайте новый фильтр на основе grok, который может принимать grok_from_field, а затем установите новый фильтр grok таким образом... можно избавиться от большой сложности обработки массива шаблонов . Я мог бы попробовать это сам в эти выходные или на следующей неделе, так как это звучит как интересный вызов. - person Alcanzar; 23.10.2014
comment
Спасибо за помощь. Я обновлю, если я возьму удар на нем. К сожалению, для этого проекта я парень Python, поэтому мне потребуется сменить передачу. - person freb; 24.10.2014
comment
Есть ли новости на эту тему с 2014 года или мне все еще нужен пользовательский фильтр для достижения этой цели? - person whites11; 11.01.2018