Кафка добавляет префикс к сообщению

Работая с kafka 7.2, при отправке сообщения с использованием производителя я обнаружил, что после его использования сообщение приходит с дополнительным разделом в начале сообщения.

Например, при отправке в kafka простой строки «King Daniel», которая в байтовом массиве выглядит так:

4B 69 6E 67 20 44 61 6E 69 65 6C

Но когда я потребляю его по какой-то причине, я получаю:

00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C

Какая строка "........ֲִ.|Царь Даниил"

Поэтому у меня есть дополнительные 12 символов в начале моего сообщения. Это какой-то заголовок? Как я могу получить исходное сообщение?

Вот мой потребительский код:

public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
    long offset = 0;
    while (true) {
    // create a fetch request for partition 0, current offset, and
    // fetch size of 1MB
    FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
    ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

    for (MessageAndOffset msg : messages) {
        ByteBuffer payload = msg.message().payload();
        writer.writeToFile(payload.array());
        // advance the offset after consuming each message
        offset = msg.offset();
    }
    }
} catch (Exception e) {
    LOG.error("Error occured while consuming from kafka", e);
}
}

Итак, я пишу msg.message().payload().array() в файл, а затем, когда я открываю этот файл, я вижу исходное содержимое с добавлением 12 дополнительных символов в начале.

Как я могу получить точное исходное сообщение?


person forhas    schedule 29.11.2013    source источник
comment
Вы пробовали Utils.toString(msg.message.payload(), "UTF-8"), как указано в их руководстве?   -  person user2720864    schedule 29.11.2013
comment
Декодирование с использованием UTF-8 подходит для простых строк, но мое настоящее сообщение для него не подходит, в любом случае это не проблема. Спасибо   -  person forhas    schedule 01.12.2013


Ответы (1)


Проблема в том, что метод ByteBuffer.array() возвращает массив, который поддерживает этот буфер (см. http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#array()).

ByteBuffer может занимать только часть резервного массива. Кроме того, этот метод не будет работать для байтовых буферов только для чтения и прямых байтовых буферов: он выдаст ReadOnlyBufferException, если массив доступен только для чтения, или UnsupportedOperationException, если ByteBuffer не имеет резервного массива.

Вы можете использовать следующий фрагмент кода для чтения содержимого ByteBuffer в массив:

ByteBuffer payload = msg.message().payload();
byte[] contents = new byte[payload.remaining()];
payload.get(contents);
writer.writeToFile(contents);

Однако, возможно, стоит расширить writer, чтобы записывать данные непосредственно из ByteBuffer и избежать лишнего копирования.

person Wildfire    schedule 30.11.2013