Перехватчик Apache Flume — не удалось создать экземпляр Builder

Я написал собственный перехватчик для apache flume 1.7. Перехватчик должен установить специальный заголовок для всех событий из источника kafka, тема которых соответствует регулярному выражению, определенному в конфигурации. Но это не работает. Мои знания Java слишком низкие, пожалуйста, помогите мне решить проблему. Часть моей конфигурации /etc/flume-ng/conf/flume.conf:

########################  kafka source  ########################
agent.sources.kafka_source.type = 
org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.interceptors = i1
agent.sources.kafka_source.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor
agent.sources.kafka_source.interceptors.i1.regex = stat_.+
agent.sources.kafka_source.interceptors.i1.value = hourly
agent.sources.kafka_source.interceptors.i1.default = daily

Я получил ошибку в flume.log:

31 Jul 2017 18:41:11,819 ERROR [conf-file-poller-0] (org.apache.flume.channel.ChannelProcessor.configureInterceptors:118)  - Could not instantiate Builder. Exception follows.
java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
31 Jul 2017 18:41:11,823 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:361)  - 
Source kafka_source has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not constructable.
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
... 12 more
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more

Исходный код перехватчика:

package org.apache.flume.interceptor;

import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.regex.Matcher;
import java.util.regex.Pattern;


/**
 * Interceptor class that appends topic rotation period header to all events.
 *
 * Properties:<p>
 *
 *   regex: regex to match topics
 *
 *   value: Value to use in header insertion.
 *        (default is "value")<p>
 *
 * Sample config:<p>
 *
 * <code>
 *   agent.sources.r1.channels = c1<p>
 *   agent.sources.r1.type = SEQ<p>
 *   agent.sources.r1.interceptors = i1<p>
 *   agent.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor<p>
 *   agent.sources.r1.interceptors.i1.regex = stat_.+<p>
 *   agent.sources.r1.interceptors.i1.value = hourly<p>
 * </code>
 *
 */
public class TopicRotationHeaderInterceptor implements Interceptor {

private static final Logger logger = LoggerFactory.getLogger(TopicRotationHeaderInterceptor.class);

private String value;
private String defaultValue;
private Pattern matchRegex;

  /**
   * Only {@link TopicRotationHeaderInterceptor.Builder} can build me
   */
  private TopicRotationHeaderInterceptor(Pattern matchRegex, String value, String defaultValue) {
    this.matchRegex = matchRegex;
    this.value = value;
    this.defaultValue = defaultValue;
  }

  @Override
  public void initialize() {
    // no-op
  }

  /**
   * Modifies events in-place.
   */
  @Override
  public Event intercept(Event event) {
    Map<String, String> headers = event.getHeaders();

    final String topic = (String)headers.get(Constants.TOPIC_HEADER);
    String resultValue = defaultValue;

    if (matchRegex != null) {
        final Matcher matcher = matchRegex.matcher(topic);
        if (matcher.matches()) {
            resultValue = value;
        }
    }

    headers.put(Constants.HEADER, resultValue);
    return event;
  }

  /**
   * Delegates to {@link #intercept(Event)} in a loop.
   * @param events
   * @return
   */
  @Override
  public List<Event> intercept(List<Event> events) {
    for (Event event : events) {
      intercept(event);
    }
    return events;
  }

  @Override
  public void close() {
    // no-op
  }

  /**
   * Builder which builds new instance of the TopicRotationHeaderInterceptor.
   */
  public static class Builder implements Interceptor.Builder {

    private String value;
    private String defaultValue;
    private String regexStr;
    private Pattern matchRegex;

    @Override
    public void configure(Context context) {
      regexStr = context.getString(Constants.REGEX, Constants.REGEX_DEFAULT);
      matchRegex = Pattern.compile(regexStr);
      value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
      defaultValue = context.getString(Constants.DEFAULT_VALUE, Constants.DEFAULT_VALUE_DEFAULT);
    }

    @Override
    public Interceptor build() {
      return new TopicRotationHeaderInterceptor(matchRegex, value, defaultValue);
    }
  }

  public static class Constants {
    public static final String REGEX = "regex";
    public static final String REGEX_DEFAULT = ".+";

    public static final String VALUE = "value";
    public static final String VALUE_DEFAULT = "daily";

    public static final String DEFAULT_VALUE = "default";
    public static final String DEFAULT_VALUE_DEFAULT = "daily";

    public static final String HEADER = "rotation";
    public static final String TOPIC_HEADER = "topic";
  }
}

person Alexander    schedule 01.08.2017    source источник


Ответы (1)


В вашем файле flume.conf есть ошибка, измените

org.apache.flume.interceptor.TopicRotationHeaderInterceptor

За:

org.apache.flume.interceptor.TopicRotationHeaderInterceptor**$Builder**

Это вызывает метод сборки класса перехватчика

С уважением

person Jhon Mario Lotero    schedule 17.08.2017
comment
Спасибо. Это действительно помогло мне. - person Alexander; 25.08.2017