Как я могу обновить значение в MapState, не удаляя предыдущее значение?

На этот раз мой вопрос: работая с MapState, безопасно использовать mapstate.put(key, value) для изменения текущего значения ключа в mapState, или мне нужно сделать mapState.remove(key), а после этого сделать mapstate.put(key, value) снова, или есть способ обновить это значение?

Начиная с абстракции состояния Flink не предназначены для одновременного доступа и не должны совместно использоваться несколькими потоками. Итак, переформулируя свой вопрос: могу ли я обновить значение в соответствии с ключом в mapState, не удаляя ключ, а затем снова вставлять ключ? и как я могу избежать исключения ConcurrentModificationException с помощью mapState, не установив для этого оператора значение parallelism равным 1?

потому что у меня это исключение:

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
at org.apache.flink.runtime.state.ttl.TtlMapState.lambda$getWrapped$0(TtlMapState.java:63)
at org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92)
at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:62)
at org.apache.flink.runtime.state.ttl.TtlMapState.contains(TtlMapState.java:92)
at org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
at com.teavaro.cep.transformations.SessionUseCase$1.generateSessionRecord(SessionUseCase.java:65)
at com.teavaro.cep.transformations.SessionUseCase$1.generateSessionRecord(SessionUseCase.java:42)
at com.teavaro.cep.operators.SessionIdentificationProcessFunction.process(SessionIdentificationProcessFunction.java:25)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:774)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

Большое спасибо. С уважением.


person Alejandro Deulofeu    schedule 16.04.2020    source источник
comment
Вы, должно быть, делаете что-то необычное, чтобы столкнуться с этим. Но чтобы ответить на ваш вопрос, можно просто вызвать put для обновления. Нет необходимости сначала удалять существующее значение.   -  person David Anderson    schedule 16.04.2020
comment
Кроме того, mapState предназначен для использования в операторах, в которых параллелизм выше 1. Ограничение одновременного доступа находится в пределах данного экземпляра оператора.   -  person David Anderson    schedule 16.04.2020
comment
Еще раз большое спасибо, Дэвид, я новичок в Flink и Stream Processing, извините, если мои вопросы выходят за рамки обычных, но я спрашиваю, потому что не нахожу ответа в других местах. Кстати mapState работает, большое спасибо за все.   -  person Alejandro Deulofeu    schedule 16.04.2020


Ответы (1)


Можно просто вызвать put, чтобы обновить запись в MapState. Нет необходимости сначала remove существующее значение.

person David Anderson    schedule 16.04.2020