В KafkaProducer.send (сообщение) я получаю исключение Ошибка сериализации сообщения Avro

Я использую Avro для создания класса. Вот мой код у производителя выглядит так:

TweetInfo tweetInfo = TweetInfo.newBuilder()
                    .setTweetId(status.getId())
                    .setTweetCreatedAt(status.getCreatedAt().toString())
                    .setTweetMessage(status.getText())
                    .setUserId(user.getId())
                    .setUserCreatedAt(user.getCreatedAt().toString())
                    .setUserName(user.getName())
                    .setUserScreenName(user.getScreenName())
                    .build();

            ProducerRecord<String, TweetInfo> data = new ProducerRecord(KafkaConstants.TOPIC, tweetInfo);
            producer.send(data);

TweetInfo - это класс, созданный схемой Avro. Когда я запускаю программу, я вижу трассировку стека следующим образом

    2018-12-11 01:51:58.138  WARN 16244 --- [c Dispatcher[0]] o.i.service.kafka.TweetKafkaProducer     : exception Error serializing Avro message
2018-12-11 01:51:59.162 ERROR 16244 --- [c Dispatcher[0]] i.c.k.s.client.rest.RestService          : Failed to send HTTP request to endpoint: http://localhost:8081/subjects/twitterData-value/versions

java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_152]
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_152]
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_152]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_152]
    at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_152]
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309) ~[na:1.8.0_152]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:178) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) [kafka-avro-serializer-5.0.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) [kafka-avro-serializer-5.0.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:879) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:728) [kafka-clients-2.1.0.jar:na]
    at org.interview.service.kafka.TweetKafkaProducer$1.onStatus(TweetKafkaProducer.java:95) [classes/:na]
    at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:75) [twitter4j-stream-4.0.6.jar:4.0.6]
    at twitter4j.StatusStreamBase$1.run(StatusStreamBase.java:105) [twitter4j-stream-4.0.6.jar:4.0.6]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]

У меня работает zookeeper и kafka. Нужно ли мне также запускать реестр схем? Если да, то есть ли для этого руководство? Я ничего не могу найти.


person Ali Hassan    schedule 11.12.2018    source источник
comment
какой у тебя кафкаконфиг?   -  person Sid Malani    schedule 11.12.2018
comment
@SidMalani вы имеете ввиду конфиг производителя?   -  person Ali Hassan    schedule 11.12.2018
comment
Ошибка предполагает, что реестр вашей схемы не запущен. Вы проверили этот бит?   -  person Sid Malani    schedule 11.12.2018
comment
ознакомьтесь с этой статьей - запуск бита реестра .... aseigneurin.github.io/2018/08/02/   -  person Sid Malani    schedule 11.12.2018
comment
@SidMalani Статья не особо помогла. Я не могу запустить реестр. Стоит ли добавлять что-нибудь в переменные env? Я только что скачал объединенный проект. Когда я пытаюсь запустить команды в статье, появляется ошибка, что bin не является допустимой командой   -  person Ali Hassan    schedule 11.12.2018
comment
Как у вас работает кафка на локальном компьютере? Ты пользуешься докером? Также можете ли вы опубликовать журнал, включая команду, которую вы пытались выполнить, и сообщения об ошибках?   -  person Sid Malani    schedule 11.12.2018


Ответы (2)


Как сказал @ cricket_007, если вы находитесь в Windows, попробуйте использовать докер.

Под ссылкой на docker compose, который будет запускать kafka, zookeeper, schema registry и kafka rest, вы можете легко протестировать своего производителя. https://github.com/confluentinc/docker-images/blob/master/examples/fullstack/docker-compose.yml.

РЕДАКТИРОВАТЬ: извините, это ссылка на старое репо, проверьте тот, что ниже, у вас вся объединенная платформа (вы можете удалить то, что вам не нужно)!

https://github.com/confluentinc/cp-docker-images/blob/5.0.1-post/examples/cp-all-in-one/docker-compose.yml

person Saïd Bouras    schedule 12.12.2018
comment
Спасибо за ответ. Я попытался использовать docker-compose.yml, но не смог подключиться. Я разместил stacktrace здесь github.com/wurstmeister/kafka-docker/issues/440 - person Ali Hassan; 13.12.2018
comment
Извините, я дал вам ссылку на старое репо, проверьте ответ, я просто добавляю правку. - person Saïd Bouras; 13.12.2018

Не удалось отправить HTTP-запрос на конечную точку

Сервер реестра Confluent Schema Registry Server должен быть запущен. И вы можете попробовать самостоятельно подключиться к конечным точкам HTTP (см. Документы ниже).

Не знаю, как вы это начали, но вы можете загрузить Confluent OSS, извлеките его куда-нибудь, затем в терминале вы захотите перейти к bin местоположению извлеченной папки и запустить confluent start schema-registry. Примечание. Это работает только для Linux.

Или, если вам нужна конфигурация «производственного развертывания», вам нужно будет сначала отредактировать файлы свойств в папке etc и запустить каждый из Zookeeper, Kafka и Registry, используя соответствующие сценарии.

Документы: Запуск реестра схем


Что касается комментариев

Когда я пытаюсь запустить команды в статье, появляется ошибка, что bin не является допустимой командой

$ bin/... сначала предполагает, что вы cd попали в папку confluent-x.x.x, которая была извлечена


Кстати, существуют существующие проекты Kafka Connect, которые взаимодействуют с API Twitter.

person OneCricketeer    schedule 11.12.2018
comment
Спасибо за ответ. Я пытаюсь запустить команду confluent start schema-registry, но она показывает, что «конфлюент» не распознается как внутренняя или внешняя команда ». Нужно ли мне вносить какие-либо изменения в переменные env? - person Ali Hassan; 11.12.2018
comment
Вы можете отредактировать свой PATH, или вам нужно cd в папку bin, как упоминалось. Примечание. Команда Confluent недоступна в Windows. - person OneCricketeer; 11.12.2018
comment
Я уже в папке bin. Я использую окна. Я тоже обновил PATH, и мне все еще не повезло с запуском реестра схемы. Я все еще вижу то же сообщение confluent' is not recognized as an internal or external command - person Ali Hassan; 12.12.2018
comment
Опять же, confluent Команда недоступна в Windows. Вам нужно будет переключиться в папку bin\windows (и вместо этого добавить ее в свой PATH) и запустить там schema-registry-start.bat файл. - person OneCricketeer; 12.12.2018
comment
На самом деле, похоже, что команды schema-registry тоже не созданы для Windows, поэтому вам придется запускать в Docker или на виртуальной машине, как упоминалось в Confluent Slack. - person OneCricketeer; 12.12.2018