Всякий раз, когда пользователь добавляет в избранное какой-либо контент на нашем сайте, мы собираем события, и то, что мы планировали сделать, - это ежечасно фиксировать совокупное избранное контента и обновлять общее количество избранных в БД.
Мы оценивали Kafka Streams. Последовал примеру с подсчетом слов. Наша топология проста: создание в теме A, чтение и передача агрегированных данных в другую тему B. Затем потребляйте события из темы B каждый час и фиксируйте в базе данных.
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
return new StreamsConfig(props);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
StreamsBuilder builder = streamBuilder();
KStream<String, String> source = builder.stream(topic);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
.to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
streams.start();
return source;
}
@Bean
public StreamsBuilder streamBuilder() {
return new StreamsBuilder();
}
Однако когда я использую эту тему B, она с самого начала дает мне агрегированные данные. Мой вопрос в том, можем ли мы иметь какое-то положение, в котором я могу использовать сгруппированные данные за предыдущие часы, а затем зафиксировать их в БД, а затем Kakfa забывает о данных за предыдущие часы и предоставляет новые данные каждый час, а не совокупную сумму. Правильная ли топология проекта или мы можем сделать что-то лучше?