Уточнение операций смещения API Kafka Java

Я пытаюсь использовать низкоуровневый Consumer Java API для управления смещениями вручную с последней версией kafka_2.10-0.8.2.1. Чтобы проверить правильность смещений, которые я фиксирую / считываю из Kafka, я использую инструмент kafka.tools.ConsumerOffsetChecker.

Вот пример вывода для моей темы / группы потребителей:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none

Вот моя интерпретация результата:

Offset = 5 -> это текущее смещение моего потребителя elastic_search_group

logSize = 29 -> это Последнее смещение - смещение следующего сообщения, которое придет в эту тему / раздел

Lag = 24 -> 29-5 - сколько сообщений еще не обработано моим потребителем elastic_search_group

Pid - ID раздела

Q1: это правильно?

Теперь я хочу получить ту же информацию от своего потребителя Java. Здесь я обнаружил, что мне пришлось использовать два разных API:

kafka.javaapi. OffsetRequest, чтобы получить самое раннее и последнее смещение, но kafka.javaapi. OffsetFetchRequest, чтобы получить текущее смещение.

Чтобы получить самое раннее (или последнее) смещение, я делаю:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];

А чтобы получить текущее смещение, мне нужно использовать совершенно другой API:

short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>(); 
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();

Q2: это правильно? почему есть два разных API для получения очень похожей информации?

В3: имеет ли значение, какой versionId и correlationId я здесь использую? Я думаю, что versionId должен быть 0 для kafka до 0.8.2.1 и 1 для 0.8.2.1 и более поздних версий, но похоже, что он работает с 0 и для 0.8.2.1 - см. Ниже?

Итак, для примера состояния темы выше и вышеприведенного вывода ConsumerOffsetChecker, вот что я получаю из моего кода Java:

currentOffset = 5; earliestOffset = 29; latestOffset = 29

«currentOffset» вроде в порядке, «latestOffset» тоже верен, но «earliestOffset»? Я ожидал, что это будет не менее «5»?

Q4: Как могло случиться, что ближайшееOffset больше, чем currentOffset? Мое единственное подозрение, что сообщения из этой темы, возможно, были удалены из-за политики хранения…. В каких других случаях такое могло случиться?


person Marina    schedule 20.05.2015    source источник


Ответы (1)


Искал способы найти лаги в разделах. И это включает в себя те же шаги, что и вы. Итак, исходя из того, что я узнал, я могу дать вам ответы.

  1. logSize напрямую указывает, сколько сообщений было накоплено в этом конкретном разделе. Или он указывает максимальное смещение сообщений в этом разделе. Смещение - это смещение последнего успешно использованного сообщения. Таким образом, отставание - это всего лишь разница между размером журнала и смещением.
  2. Да, это правильно. Пока это единственные два способа найти текущее смещение и самое раннее или последнее смещение.
  3. Не знаю, зачем указывать versionId. Вы можете использовать kafka.api.OffsetRequest.CurrentVersion(), чтобы получить versionId. Так что жесткого кодирования можно избежать. Вы можете с уверенностью принять значение correlationId равным 0.
  4. Это странно. Когда я использую EarliestTime (), я получаю самое раннее смещение как 0, даже если мое текущее смещение значительно увеличилось. Значит, это начало раздела. Поэтому, когда срок действия некоторых сообщений истечет в будущем, это самое раннее смещение будет ненулевым числом. Теперь, если сообщения были удалены из-за задержки политики хранения, следовало бы изменить. Я не уверен в таком поведении. Один из способов убедиться в этом - запустить потребителя, заметив такое чтение и проверив его журналы. На нем должны быть такие строки.

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo: 52 :: reset потребляемое смещение запросов: 2: полученное смещение = 405952: используемое смещение = от 335372 до 335372 2015-06-09 18:49:15: : DEBUG :: PartitionTopicInfo: 52 :: reset потребляемое смещение запросов: 2: полученное смещение = 405952: используемое смещение = от 335373 до 335373

Обратите внимание, что в приведенных выше строках журнала полученное смещение остается таким же, а потребляемое смещение увеличивается. В конце концов, это закончится

2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo: 52 :: сбросить смещение потребления запросов: 2: полученное смещение = 405952: потребленное смещение = от 405952 до 405952

Тогда это будет означать, что из-за смещения политики хранения журналов с 335372 на 405952 истек срок действия.

person Shades88    schedule 15.06.2015
comment
Спасибо, @ Shades88! После некоторого теста, для №4 - я пришел к такому же выводу, что такая ситуация может произойти, когда журналы будут очищены из-за политики хранения. Поэтому я добавил обработку этого углового случая в свою логику потребителя - проверьте, что текущее смещение равно ›= самому раннему смещению, и установите для него значение EarliestOffset, если нет. Спасибо! - person Marina; 16.06.2015
comment
Что касается versionId, если вы укажете 0, смещения сохраняются в Zookeeper, а если вы используете 1, смещения сохраняются в специальной теме Kafka. - person Marko Bonaci; 08.11.2015