Как агрегировать данные ежечасно?

Всякий раз, когда пользователь добавляет в избранное какой-либо контент на нашем сайте, мы собираем события, и то, что мы планировали сделать, - это ежечасно фиксировать совокупное избранное контента и обновлять общее количество избранных в БД.

Мы оценивали Kafka Streams. Последовал примеру с подсчетом слов. Наша топология проста: создание в теме A, чтение и передача агрегированных данных в другую тему B. Затем потребляйте события из темы B каждый час и фиксируйте в базе данных.

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
   public StreamsConfig kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
    return new StreamsConfig(props);
}

@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
    StreamsBuilder builder = streamBuilder();
    KStream<String, String> source = builder.stream(topic);
    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
            .groupBy((key, value) -> value)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
            .to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
    streams.start();
    return source;
}

@Bean
public StreamsBuilder streamBuilder() {
    return new StreamsBuilder();
}

Однако когда я использую эту тему B, она с самого начала дает мне агрегированные данные. Мой вопрос в том, можем ли мы иметь какое-то положение, в котором я могу использовать сгруппированные данные за предыдущие часы, а затем зафиксировать их в БД, а затем Kakfa забывает о данных за предыдущие часы и предоставляет новые данные каждый час, а не совокупную сумму. Правильная ли топология проекта или мы можем сделать что-то лучше?


person Sandeep B    schedule 18.01.2018    source источник


Ответы (1)


Если вы хотите получать один результат агрегации в час, вы можете использовать оконную агрегацию с размером окна 1 час.

stream.groupBy(...)
      .windowedBy(TimeWindow.of(1 *3600 * 1000))
      .count(...)

Дополнительные сведения см. В документации: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing.

Тип вывода - Windowed<String> для ключа (не String). Вам необходимо указать собственный Window<String> Serde или преобразовать тип ключа. Проконсультируйтесь с SessionWindowsExample.

person Matthias J. Sax    schedule 18.01.2018