в совокупности с этим вопросом Я до сих пор не понимаю, почему контрольные точки моего Flink количество рабочих мест растет и растет с течением времени, и в настоящее время, в течение примерно 7 дней подряд, эти контрольные точки никогда не выходят на плато. В настоящее время я использую версию Flink 1.10, FS State Backend, поскольку моя работа не может позволить себе затраты на задержку при использовании RocksDB.
Посмотрите, как контрольные точки развиваются за 7 дней: Допустим, у меня есть эта конфигурация для TTL состояний во всех моих операторах с отслеживанием состояния на один час или, может быть, больше, и на день. в одном случае:
public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot().build();
К моему беспокойству, все объекты в состояниях будут очищены по истечении времени, поэтому размер контрольных точек должен быть уменьшен, и, как мы ожидаем, более или менее тот же объем данных каждый день.
С другой стороны, у нас есть кривая трафика, которая имеет больше входящих данных в некоторые часы дня, но поздно ночью трафик падает, и все объекты в состояниях, срок действия которых истекает, должны быть очищены, что приводит к уменьшению размера контрольной точки. не сохраняется в том же размере, пока трафик снова не увеличится.
Давайте посмотрим на этот пример кода для одного варианта использования:
DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
apply filters here;))
.name("Events filtered")
.keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())
public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;
@Override
public void open(Configuration parameters) {
/*ttlConfig described above*/
descriptor.enableTimeToLive(ttlConfig);
previousState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Event event, Collector<Event> collector) throws Exception {
final String key = event.rType.equals("something") ? event.id1 : event.id2;
Event previous = previousState.get(key);
if(previous != null){
/*something done here*/
}else /*something done here*/
previousState.put(key, previous);
collector.collect(previous);
}
}
Более или менее это структура вариантов использования и некоторых других, использующих Windows (временное окно или окно сеанса).
Вопросов:
- Что я здесь делаю не так?
- Очищаются ли состояния по истечении срока их действия и этот сценарий совпадает с остальными вариантами использования?
- Что может помочь мне исправить размер контрольных точек, если они работают неправильно?
- Это нормально?
С уважением!
java -jar jobName.jar
. - person Alejandro Deulofeu   schedule 04.09.2020System.exit(1);
в соответствии с решением компании в данный момент. - person Alejandro Deulofeu   schedule 04.09.2020