Kafka KStream — использование AbstractProcessor с окном

Я надеюсь сгруппировать оконные пакеты вывода из KStream и записать их во вторичное хранилище.

Я ожидал, что .punctuate() будет вызываться примерно каждые 30 секунд. То, что я получил вместо этого, сохранено здесь.

(Исходный файл состоял из нескольких тысяч строк)

Резюме - .punctuate() вызывается, по-видимому, случайным образом, а затем неоднократно. Похоже, что он не соответствует значению, установленному через ProcessorContext.schedule().


Редактировать:

Другой запуск того же кода производил вызовы .punctuate() примерно каждые четыре минуты. На этот раз я не видел сумасшедших повторяющихся значений. Никаких изменений в источнике - просто другой результат.

Используя следующий код:

Главный

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

Процессор

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

ПроцессорПоставщик

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

Редактировать:

Чтобы убедиться, что мой отладчик не замедляет это, я построил его и запустил на том же компьютере, что и мой процесс kafka. На этот раз он даже не пытался отставать на 4 минуты и более - в течение нескольких секунд он выдавал ложные вызовы на .punctuate(). Многие (большинство) из них без промежуточных вызовов .process().


person ethrbunny    schedule 31.08.2016    source источник


Ответы (3)


Обновление: эта часть ответа предназначена для Kafka версии 0.11 или более ранней (для Kafka 1.0 и более поздних версий см. ниже)

В Kafka Streams пунктуация основана на времени потока, а не не системном времени (также известном как время обработки).

По умолчанию stream-time равно event-time, т. е. отметке времени, встроенной в сами записи Kafka. Поскольку вы не устанавливаете TimestampExtractor не по умолчанию (см. timestamp.extractor в http://docs.confluent.io/current/streams/developer-guide.html#Optional-configuration-parameters), вызовы punctuate зависят только от процесса времени события в отношении записи, которые вы обрабатываете. Таким образом, если вам нужно несколько минут для обработки "30 секунд" (время события) записей, punctuate будет вызываться реже, чем 30 секунд (время настенных часов)...

Это также может объяснить ваши нерегулярные вызовы (т. е. всплески и длительные задержки). Если время вашего события данных «прыгает», а ваши данные, которые нужно обработать, уже полностью доступны в вашей теме, Kafka Streams также «прыгает» в отношении внутренне поддерживаемого времени потока.

Я предполагаю, что вы можете решить свою проблему с помощью WallclockTimestampExtractor (см. http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)

Еще одна вещь, о которой следует упомянуть: stream-time продвинут только в том случае, если данные обрабатываются — если ваше приложение достигает конца входных разделов и ожидает данных, punctuate не будет вызываться. Это применимо, даже если вы используете WallclockTimestampExtractor.

Кстати: в настоящее время обсуждается пунктуационное поведение потоков: https://github.com/apache/kafka/pull/1689

Ответ для Kafka 1.0 и более поздних версий

Начиная с Kafka 1.0 можно регистрировать знаки препинания на основе времени настенных часов или времени события: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

person Matthias J. Sax    schedule 31.08.2016
comment
Привет @Matthias Это верно даже сейчас или нам нужно обновить его? AFAIR несколько месяцев назад в документации изменение экстрактора временных меток на Wallclock не повлияло на то, как вызывается пунктуация. stackoverflow.com/users/4953079/matthias-j-sax - person Raghvendra Singh; 08.03.2018
comment
Этот ответ устарел и предназначен для более старой версии. Я обновлю свой ответ. Спасибо, что указали. - person Matthias J. Sax; 09.03.2018

Только что закончил читать ответ на этот вопрос, который, я думаю, тоже отвечает на ваш вопрос. Суть в том, что:

  1. Потребитель потоков выполняет опрос для записей
  2. Все возвращаемые записи обрабатываются полностью.
  3. Затем обратный вызов с пунктуацией планируется с настроенной задержкой.

Дело в том, что пунктуация не является событием с фиксированным интервалом времени, и различия в том, сколько времени занимает # 2, приведут к эквивалентным изменениям в периоде выполнения пунктуации.

....но прочтите ту ссылку, он говорит лучше меня.

person Nicholas    schedule 31.08.2016
comment
Почему все повторяющиеся призывы к пунктуации? Их было тысячи. - person ethrbunny; 31.08.2016
comment
Если ваш ответ правильный, кажется, что оконная задержка в AbstractProcessor бесполезна. Зачем собирать записи, а потом сидеть без дела какое-то время? Я мог бы понять, собирая записи на время, а затем, возможно, тратя дополнительное время на их обработку. - person ethrbunny; 31.08.2016
comment
Пунктуация выполняется не только один раз за poll. Он встроен в обработку записей и может вызываться несколько раз между двумя вызовами poll. Смотрите мой ответ. - person Matthias J. Sax; 31.08.2016

Хорошо - я думаю, что это ошибка в Кафке.

Вот почему:

В моем первоначальном тестировании я использовал одну машину для запуска как Producer, так и Consumer. Я запускал Producer на несколько минут, чтобы сгенерировать некоторые тестовые данные, а затем запускал свои тесты. Это дало бы странный результат, который я опубликовал изначально.

Затем я решил перевести Producer в фоновый режим и оставить его работающим. Теперь я вижу 100% идеальные 30-секундные интервалы между вызовами .punctuate(). Больше с этим проблем нет.

Другими словами, если сервер kafka не обрабатывает какие-либо входящие данные, то это не соответствует запуску процессов KStreams.

person ethrbunny    schedule 31.08.2016
comment
Кстати: вы можете отредактировать свой вопрос вместо публикации этого ответа. :) - person Matthias J. Sax; 31.08.2016