я играю с Kafka и потоковой технологией; Я создал собственный сериализатор и десериализатор для KStream, который я буду использовать для получения сообщений из данной темы.
Теперь проблема в том, что я создаю serde таким образом:
JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);
Реализация сериализатора:
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T data) {
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
Реализация десериализатора:
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String topic, byte[] data) {
System.out.print(data);
if(data == null){
return null;
}
return gson.fromJson(new String(data),deserializedClass);
}
@Override
public void close() {
}
}
Когда я пытаюсь выполнить код, я получаю следующую ошибку:
Вызвано: org.apache.kafka.common.KafkaException: не удалось создать экземпляр класса org.apache.kafka.common.serialization.Serdes $ WrapperSerde Есть ли у него общедоступный конструктор без аргументов?
Полный дамп здесь: https://pastebin.com/WwpuXuxB
Вот как я пытаюсь использовать serde:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);
KStream<String, EventMessage> outStream = eventsStream
.mapValues(value -> EventMessage.build(value.type, value.timestamp));
outStream.to("output");
Кроме того, я не совсем уверен, что правильно настраиваю свойства для глобальной настройки сериализатора и десериализатора:
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());
JsonSerializer
? - person Michael G. Noll   schedule 08.06.2017StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()
должно бытьStreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()
. - person Michael G. Noll   schedule 08.06.2017