Spring Kafka - Источник событий - Пример того, как запросить состояние некоторой сущности с помощью Kafka + KafkaStreams API

Я использую Kafka для реализации архитектуры, основанной на поиске событий.

Допустим, я храню события в формате JSON:

{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }

Я хотел бы реализовать запрос, чтобы получить количество продукта с productId = X в определенную дату.

Можете ли вы показать примерную реализацию этого запроса с помощью Spring Kafka KStreams?

ОБНОВЛЕНИЕ. Я немного продвинулся в этом с помощью Spring Kafka KStreams, но получаю ошибку десериализации.

Это мой Spring Cloud Stream Kafka Producer:

public interface ProductProducer{

    final String OUTPUT = "productsOut";

    @Output(ProductProducer.OUTPUT)
    MessageChannel output();

}

Конфигурация:

spring:
  application:
    name: product-generator-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsOut: 
          destination: orders
          content-type: application/json

Я отправляю сообщение, используя следующий код, который правильно сериализует карту в объект JSON:

Map<String, Object> event = new HashMap<>();
event.put("name", "ProductCreated");
event.put("productId", product.getId());
event.put("quantity", product.getQuantity());
event.put("dateAdded", new Date());
        productProducer.output().send(MessageBuilder.withPayload(event).build(), 500);

MessageBuilder.withPayload(event).build() -> GenericMessage [payload={quantity=1, productId=1, name=ProductCreated, dateAdded="xxxxx"}, headers={id=fc531176-e3e9-61b8-40e3-08074fabee4d, timestamp=1499845483095}]

В приложении ProductService я могу прочитать это сообщение с помощью прослушивателя Spring Cloud Stream:

@Component
public class ProductListener{

    @StreamListener(ProductConsumer.INPUT)
    public void handleProduct(Map<String, Object> event){

Однако с KStream я получаю ошибку десериализации:

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-kstream");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1);
        stream.print();
        return stream;
    }

}

Исключение:

org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
 at [Source: [B@288e4e9a; line: 1, column: 4]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:30)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

ОБНОВЛЕНИЕ 2:

Чтобы узнать, что попадает в KStream, я использовал десериализаторы String как для ключа, так и для значения, и вот что печатается:

KStream<Integer, String> stream = kStreamBuilder.stream(null, integerSerde, stringSerde, STREAMING_TOPIC1);

Напечатанное значение:

[KSTREAM-SOURCE-0000000000]: null , �contentType

Почему я не получаю строку JSON?

ОБНОВЛЕНИЕ 3: я исправил проблему десериализации, причина заключалась в том, что производитель сообщения (Spring Cloud Stream) по умолчанию добавляет некоторые заголовки как часть полезной нагрузки. Мне нужно было только отключить это включение заголовка, чтобы начать правильно получать сообщения в Kafka Streams:

spring:
  application:
    name: product-service
  cloud:
    stream:
      kafka:
        binder:
          brokers:
          - kafka
          zk-nodes:
          - kafka
        bindings:
          productsOut:
            producer:
              sync: true
      bindings:
        productsIn:
          group: product-service 
          destination: orders
          consumer:
            max-attempts: 5
            header-mode: raw
        productsOut: 
          destination: orders
          content-type: application/json
          producer:
            header-mode: raw

Определение KStream:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

Вывод:

[KSTREAM-SOURCE-0000000000]: null , {"quantity":0,"productId":0,"name":"ProductCreated","dateAdded":1499930385450}

Теперь, когда все настроено правильно: как я могу реализовать интерактивный запрос, подобный тому, который мне нужен? -> Получить количество товара с productId = X на определенную дату


person codependent    schedule 11.07.2017    source источник
comment
Посмотрите пример здесь: github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/ Также прочтите это сообщение в блоге: confluent.io/blog/   -  person Matthias J. Sax    schedule 11.07.2017
comment
Привет, Матиас, спасибо, что указал на этот пример. Я немного продвинулся, но сейчас я застрял с ошибкой десериализации.   -  person codependent    schedule 12.07.2017
comment
Что произойдет, если вы измените KStream<Integer, JsonNode> stream = kStreamBuilder.stream(null, integerSerde, jsonSerde, STREAMING_TOPIC1); на KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);?   -  person Michael G. Noll    schedule 12.07.2017
comment
То же самое: Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')   -  person codependent    schedule 12.07.2017
comment
Вы уверены, что правильно записываете данные? Может быть, вы можете использовать консоль потребителя для проверки?   -  person Matthias J. Sax    schedule 12.07.2017


Ответы (2)


Мне удалось решить эту проблему, используя смесь Spring Cloud Streams (для генерации сообщений) и Spring Kafka для обработки KafkaStreams и реализации интерактивных запросов (ВАЖНО: обратите внимание на вопрос ОБНОВЛЕНИЕ 3: чтобы иметь возможность объединить оба):

Конфигурация Kafka Streams:

@Configuration
public class KStreamsConfig {

    private static final String STREAMING_TOPIC1 = "orders";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public KStreamBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
        return new KStreamBuilderFactoryBean(streamsConfig);
    }

    @Bean
    public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

        Serde<Integer> integerSerde = Serdes.Integer();
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

        stream.map( (key, value) -> {
            return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
        }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

        stream.print();
        return stream;
    }

}

Обратите внимание, как я создаю хранилище KTable ProductsStock, которое я буду запрашивать позже в службе.

ProductService:

@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
    public Integer getProductStock(Integer id) {
        KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
        streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
        return keyValueStore.get(id);
}
person codependent    schedule 15.07.2017
comment
метод получения магазина из весеннего контекста сэкономил мне много времени! - person mohamnag; 17.04.2018

В предстоящем выпуске 1.3.0.M1 связующего устройства kafka spring Cloud Stream будет поддержка привязки kstream. Существует PR, где вы можете отслеживать прогресс этой инициативы.

Вот более общий пример (WordCount) с использованием связующего KStream: Пример WordCount с использованием поддержки Spring Cloud Stream для Kafka Streams

Таким образом, вы можете добиться того, чего ищете, следующим образом.

Этот метод StreamListener будет прослушивать тему Kafka и писать в другую тему со счетчиком продуктов с идентификатором, равным 123, во временном окне за последние 30 секунд.

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ProductCountApplication {

  public static final int = 123;

  @StreamListener("input")
  @SendTo("output")
  public KStream<?, String> process(KStream<?, Product> input) {

        return input
                .filter((key, product) -> product.getID() == PRODUCT_ID)
                .map((k,v) -> new KeyValue<>(v, v))
                .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
                .count(TimeWindows.of(30000), "product-store")
                .toStream()
                .map((w,c) -> new KeyValue<>(null, "Product with id 123 count: " + c));
  }

}

Вот используемый application.yml:

spring.cloud.stream.kstream.binder.streamConfiguration:
  key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the key
  value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde # Use a native Kafka Serde for the value
spring.cloud.stream.bindings.output.producer:
  headerMode: raw # Incoming data has no embedded headers
  useNativeEncoding: true # Write data using the native Serde
spring.cloud.stream.bindings.input.consumer:
  headerMode: raw # Outbound data has no embedded headers

Когда вы запускаете программу, вам необходимо передать назначения (темы) ввода / вывода:

--spring.cloud.stream.bindings.input.destination=products 
--spring.cloud.stream.bindings.output.destination=counts
person sobychacko    schedule 14.07.2017
comment
Спасибо за обновление следующей версии Spring Cloud Stream. Хотя это могло быть решением, я бы хотел использовать интерактивные запросы (используя ReadOnlyKeyValueStore для запроса значений), а не другую тему. При таком подходе мы могли бы использовать что-то вроде: .groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock"); для создания запрашиваемой KTable? Также нам потребуется доступ к объекту KafkaStreams для доступа к магазину: streams.store("ProductsStock", QueryableStoreTypes.keyValueStore()); Как мы можем получить к нему доступ? - person codependent; 15.07.2017