Исключение KafkaStreams serde

я играю с Kafka и потоковой технологией; Я создал собственный сериализатор и десериализатор для KStream, который я буду использовать для получения сообщений из данной темы.

Теперь проблема в том, что я создаю serde таким образом:

JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

Реализация сериализатора:

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}  

Реализация десериализатора:

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        System.out.print(data);
        if(data == null){
            return null;
        }

        return gson.fromJson(new String(data),deserializedClass);
    }

    @Override
    public void close() {

    }
}

Когда я пытаюсь выполнить код, я получаю следующую ошибку:

Вызвано: org.apache.kafka.common.KafkaException: не удалось создать экземпляр класса org.apache.kafka.common.serialization.Serdes $ WrapperSerde Есть ли у него общедоступный конструктор без аргументов?

Полный дамп здесь: https://pastebin.com/WwpuXuxB

Вот как я пытаюсь использовать serde:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);

KStream<String, EventMessage> outStream = eventsStream
            .mapValues(value -> EventMessage.build(value.type, value.timestamp));

outStream.to("output");

Кроме того, я не совсем уверен, что правильно настраиваю свойства для глобальной настройки сериализатора и десериализатора:

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());

person Bruno Ripa    schedule 07.06.2017    source источник
comment
Можете ли вы проверить, поможет ли добавление явного конструктора по умолчанию (без аргументов) в JsonSerializer?   -  person Michael G. Noll    schedule 08.06.2017
comment
Например, StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass() должно быть StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName().   -  person Michael G. Noll    schedule 08.06.2017


Ответы (3)


Чтобы завершить ответ Матиаса, я только что закодировал простой пример того, как создать собственный Serde (сериализатор / десериализатор) в приложении Kafka Stream. Его можно клонировать и попробовать в: https://github.com/Davidcorral94/Kafka-Streams-Custom-Seder

Сначала я создаю два класса: один для сериализатора, а другой - для десериализатора. В этом случае я использую библиотеку Gson для выполнения сериализации / десериализации.

Сериализатор

public class PersonSerializer implements Closeable, AutoCloseable, Serializer<Person> {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String s, Person person) {
        // Transform the Person object to String
        String line = gson.toJson(person);
        // Return the bytes from the String 'line'
        return line.getBytes(CHARSET);
    }

    @Override
    public void close() {

    }
}

Десериализатор

public class PersonDeserializer implements Closeable, AutoCloseable, Deserializer<Person> {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public Person deserialize(String topic, byte[] bytes) {
        try {
            // Transform the bytes to String
            String person = new String(bytes, CHARSET);
            // Return the Person object created from the String 'person'
            return gson.fromJson(person, Person.class);
        } catch (Exception e) {
            throw new IllegalArgumentException("Error reading bytes", e);
        }
    }

    @Override
    public void close() {

    }
}

Затем я оборачиваю их обоих в Serde, чтобы иметь возможность использовать его в моем приложении Kafka Stream.

Серде

public class PersonSerde implements Serde<Person> {
    private PersonSerializer serializer = new PersonSerializer();
    private PersonDeserializer deserializer = new PersonDeserializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<Person> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<Person> deserializer() {
        return deserializer;
    }
}

Наконец, вы можете использовать этот класс Serde в своем приложении Kafka Stream со следующей строкой:

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerde.class);

Это действительно работает с последней доступной на данный момент версией Kafka - 1.0.0!

person David Corral    schedule 21.12.2017

Если вы вызовете Serdes.serdeFrom(...), вы получите обратно тип WrappedSerde, предназначенный для внутреннего использования (а WrappedSerde не имеет конструктора без аргументов). В настоящее время нет API, который можно было бы вызвать для получения пользовательского Serde. Вместо этого вам нужно реализовать собственный Serde класс и обернуть сериализатор и десериализатор «вручную».

public class EventMessageSerde implements Serde<EventMessage> {
    final private JsonSerializer<EventMessage> serializer;
    final private JsonDeserializer<EventMessage> deserializer;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<EventMessage> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<EventMessage> deserializer() {
        return deserializer;
    }
}

В вашем Properties вы можете установить:

streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, EventMessageSerde.class);
person Matthias J. Sax    schedule 10.06.2017
comment
Спасибо за обновление, я постараюсь как можно скорее дать вам отзыв! - person Bruno Ripa; 12.06.2017

Другой способ - использовать StreamsBuilder вместо KStreamBuilder. KStreamBuilder устарел в 1.0.0. Вы можете напрямую передать объект serde, используя Consumed.with при создании потока. В этом сценарии вам не нужно создавать собственный класс Serde.

Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(topic, Consumed.with(Serdes.String(), messageSerde));

Вы можете сохранить StringSerde в приведенном ниже коде вместо использования messageSerde.getClass(), который не работает, потому что messageSerde - это просто WrappedSerde, у которого нет конструктора без аргументов.

streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
person san    schedule 16.02.2018