Десериализация Spring-Kafka

Я пытаюсь создать потребителя кафки, который слушает определенную тему и обрабатывает потребляемое сообщение как JSON. Я попытался следовать подходу, указанному в весенних документах здесь, но не могу чтобы получать сообщения в формате JSON.

Это мой код для конфигурации приемника:

@Configuration
@EnableKafka
public class ReceiverConfig {

@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map consumerConfigs() {
    Map props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections
    // to the Kakfa cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // consumer groups allow a pool of processes to divide the work of
    // consuming and processing records
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Waitlist");

    return props;
}

@Bean
public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

@Bean
public Receiver receiver() {
    return new Receiver();
}

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}
}

Потребитель:

public class Receiver {

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "Reservation",
        containerFactory = "kafkaJsonListenerContainerFactory")
public void receiveMessage(Message<?> message) {
    LOGGER.info("received message='{}'", message);
    latch.countDown();
}

public CountDownLatch getLatch() {
    return latch;
}

}

Когда я пытаюсь опубликовать тему на удаленном сервере, я получаю следующую ошибку:

        2017-02-09 13:42:49.122 [1;31mERROR[0;39m [36mo.s.k.listener.LoggingErrorHandler[0;39m Error while processing: ConsumerRecord(topic = Reservation, partition = 0, offset = 3394, CreateTime = 1486626082480, checksum = 1777660938, serialized key size = -1, serialized value size = 2, key = null, value = hi)
        org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null')
         at [Source: hi; line: 1, column: 5]
            at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:81)
            at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:82)
            at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:157)
            at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:68)
            at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:975)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.lang.Thread.run(Thread.java:745)
        Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null')
         at [Source: hi; line: 1, column: 5]
            at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
            at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
            at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835)
            at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903)
            at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749)
            at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
            at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
            at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2880)
            at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:78)
            ... 11 common frames omitted

Однако, если я удаляю containerfactory из прослушивателя, я могу получать сообщения, но они не в формате JSON, а в виде строки:

2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='{'
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "_eventType":"Reservation",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "_timestamp":"2017-01-23T09:19:35Z",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "_operation":"create",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "type":"excursion",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "reservationId":"46d353ac_9575_492a_9291_98d15bf4cc82",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "eventReservationLinkId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "master":true,'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "partySize":2,'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "startTime":"2017-01-27T08:30:00Z",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "endTime":"2017-01-27T10:00:00Z",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "timeslotId":"c2304a34_b9ba_4f3c_8e45_3e3c7677d6c2",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "variantSku":"ocean_polar_1606_FLL-640B",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "guestId":"378741",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "createdBy":"149673",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "purchaser":"143679",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "eventId":"ocean_polar_1606_FLL-640",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "scheduledEventId":"02c95434_3a99_452e_a2a8_51712683926c",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "resourceId":"",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "surpriseFlag":false,'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "venueId":"FLL001",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "status":"CONFIRMED",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "primaryId":"378741",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='  "partyId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2"'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='}'

person Kuber    schedule 09.02.2017    source источник


Ответы (1)


Ваши сообщения представляют собой отдельные фрагменты документа JSON.

received message='{'
received message='  "_eventType":"Reservation",'
received message='  "_timestamp":"2017-01-23T09:19:35Z",'
...

Чтобы конвертировать из JSON, его необходимо инкапсулировать в одно сообщение.

person Gary Russell    schedule 09.02.2017
comment
это означает, что ответственность лежит на «Продюсере». Спасибо @Gary! - person Kuber; 10.02.2017