Количество контрольных точек увеличивается со временем во Flink

в совокупности с этим вопросом Я до сих пор не понимаю, почему контрольные точки моего Flink количество рабочих мест растет и растет с течением времени, и в настоящее время, в течение примерно 7 дней подряд, эти контрольные точки никогда не выходят на плато. В настоящее время я использую версию Flink 1.10, FS State Backend, поскольку моя работа не может позволить себе затраты на задержку при использовании RocksDB.

Посмотрите, как контрольные точки развиваются за 7 дней:  введите описание изображения здесь Допустим, у меня есть эта конфигурация для TTL состояний во всех моих операторах с отслеживанием состояния на один час или, может быть, больше, и на день. в одном случае:

public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot().build();

К моему беспокойству, все объекты в состояниях будут очищены по истечении времени, поэтому размер контрольных точек должен быть уменьшен, и, как мы ожидаем, более или менее тот же объем данных каждый день.

С другой стороны, у нас есть кривая трафика, которая имеет больше входящих данных в некоторые часы дня, но поздно ночью трафик падает, и все объекты в состояниях, срок действия которых истекает, должны быть очищены, что приводит к уменьшению размера контрольной точки. не сохраняется в том же размере, пока трафик снова не увеличится.

Давайте посмотрим на этот пример кода для одного варианта использования:

DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
                    apply filters here;))
                    .name("Events filtered")
                    .keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())


public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;

@Override
    public void open(Configuration parameters) {
        /*ttlConfig described above*/
        descriptor.enableTimeToLive(ttlConfig);
        previousState = getRuntimeContext().getMapState(descriptor);
    }

@Override
    public void flatMap(Event event, Collector<Event> collector) throws Exception {
      final String key = event.rType.equals("something") ? event.id1 : event.id2;
      Event previous = previousState.get(key);
      if(previous != null){
        /*something done here*/
      }else /*something done here*/
        previousState.put(key, previous);
        collector.collect(previous);
 }
}

Более или менее это структура вариантов использования и некоторых других, использующих Windows (временное окно или окно сеанса).

Вопросов:

  • Что я здесь делаю не так?
  • Очищаются ли состояния по истечении срока их действия и этот сценарий совпадает с остальными вариантами использования?
  • Что может помочь мне исправить размер контрольных точек, если они работают неправильно?
  • Это нормально?

С уважением!


person Alejandro Deulofeu    schedule 02.09.2020    source источник
comment
Куча JVM тоже растет аналогичным образом или только размеры контрольных точек?   -  person David Anderson    schedule 02.09.2020
comment
Нет, JVM Heap в порядке. Это означает, что сборщик мусора работает правильно? Спасибо.   -  person Alejandro Deulofeu    schedule 02.09.2020
comment
Я не могу сформулировать теорию вашего приложения, которая объясняла бы все факты, которыми вы поделились. Что-то не сходится. Учитывая, что я недостаточно хорошо понимаю ситуацию, стесняюсь давать какие-либо советы.   -  person David Anderson    schedule 04.09.2020
comment
Вы можете провести эксперимент, который может пролить свет на то, что происходит с увеличивающимися в размерах контрольными точками. Если вы восстановите (копию) задания из контрольной точки и отключите ввод (ы), то через час размер контрольной точки должен упасть до нуля.   -  person David Anderson    schedule 04.09.2020
comment
Позвольте мне задать вам еще один вопрос: Моя работа заключается в чтении событий из RabbitMQ - ›Преобразования -› Приемник, я не использую стратегии перезапуска, потому что в случае сбоя моего задания служба SVC выполнит автоматический перезапуск задания, и он запустится с нуля. Начиная с этого рабочего режима, контрольные точки не имеют для меня никакого значения, потому что они не использовались для перезапуска в случаях сбоя, и, согласно моему пониманию, они достойны, если приложение Flink выйдет из строя, а затем перезапустится с контрольных точек, но поскольку моя работа всегда начинается с нуля: нужны ли мне контрольные точки в моем сценарии?   -  person Alejandro Deulofeu    schedule 04.09.2020
comment
Поскольку я не могу восстановить свою работу с любой контрольной точки, потому что я выполняю свою работу как отдельное приложение Java java -jar jobName.jar.   -  person Alejandro Deulofeu    schedule 04.09.2020
comment
Даже при запуске как отдельного Java-приложения ваши задания могут восстанавливаться после контрольных точек, если сбои не приводят к отключению самого Flink. Если диспетчер заданий и диспетчер задач все еще работают, задание будет перезапущено с последней контрольной точки, если вы не отключили контрольную точку или не установили стратегию перезапуска без перезапуска.   -  person David Anderson    schedule 04.09.2020
comment
Решением компании я установил стратегию перезапуска без перезапуска. Так что я думаю, что в моем сценарии мне не нужны контрольные точки, верно? потому что любой одиночный сбой приведет к отключению Flink System.exit(1); в соответствии с решением компании в данный момент.   -  person Alejandro Deulofeu    schedule 04.09.2020
comment
Давайте продолжим это обсуждение в чате.   -  person David Anderson    schedule 04.09.2020


Ответы (1)


В этом фрагменте кода кажется, что вы просто записываете обратно состояние, которое уже было, что служит только для сброса таймера TTL. Это может объяснить, почему состояние не истекло.

Event previous = previousState.get(key);
if (previous != null) {
  /*something done here*/
} else
  previousState.put(key, previous);

Также кажется, что вам следует использовать ValueState, а не MapState. ValueState эффективно предоставляет сегментированное хранилище ключей / значений, где ключи - это ключи, используемые для разделения потока в keyBy. MapState предоставляет вложенную карту для каждого ключа, а не отдельного значения. Но поскольку вы используете тот же ключ внутри flatMap, который вы использовали для ввода потока изначально, ValueState с разделением на разделы будет всем, что вам нужно.

person David Anderson    schedule 02.09.2020
comment
Большое спасибо, Дэвид. Этот ответ проясняет мне все. С уважением - person Alejandro Deulofeu; 02.09.2020
comment
Позволяет мне задать еще один вопрос, связанный с этим: возможно, та же проблема со значениями, которые никогда не истекают, может быть корнем нагрузки на ЦП, которая со временем увеличивается, как и контрольные точки? Потому что это требует больших усилий ЦП для выполнения контрольных точек? Еще раз спасибо. - person Alejandro Deulofeu; 02.09.2020
comment
В общем, бэкэнд состояния может тратить время на попытки истечь не истекающие вещи, не только во время контрольных точек, но и постоянно. Но я бы ожидал увидеть рост кучи, если растет пространство ключей. - person David Anderson; 03.09.2020