Работая с kafka 7.2, при отправке сообщения с использованием производителя я обнаружил, что после его использования сообщение приходит с дополнительным разделом в начале сообщения.
Например, при отправке в kafka простой строки «King Daniel», которая в байтовом массиве выглядит так:
4B 69 6E 67 20 44 61 6E 69 65 6C
Но когда я потребляю его по какой-то причине, я получаю:
00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C
Какая строка "........ֲִ.|Царь Даниил"
Поэтому у меня есть дополнительные 12 символов в начале моего сообщения. Это какой-то заголовок? Как я могу получить исходное сообщение?
Вот мой потребительский код:
public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
long offset = 0;
while (true) {
// create a fetch request for partition 0, current offset, and
// fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
ByteBuffer payload = msg.message().payload();
writer.writeToFile(payload.array());
// advance the offset after consuming each message
offset = msg.offset();
}
}
} catch (Exception e) {
LOG.error("Error occured while consuming from kafka", e);
}
}
Итак, я пишу msg.message().payload().array()
в файл, а затем, когда я открываю этот файл, я вижу исходное содержимое с добавлением 12 дополнительных символов в начале.
Как я могу получить точное исходное сообщение?
Utils.toString(msg.message.payload(), "UTF-8")
, как указано в их руководстве? - person user2720864   schedule 29.11.2013