Я пытаюсь использовать низкоуровневый Consumer Java API для управления смещениями вручную с последней версией kafka_2.10-0.8.2.1. Чтобы проверить правильность смещений, которые я фиксирую / считываю из Kafka, я использую инструмент kafka.tools.ConsumerOffsetChecker.
Вот пример вывода для моей темы / группы потребителей:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Вот моя интерпретация результата:
Offset = 5 -> это текущее смещение моего потребителя elastic_search_group
logSize = 29 -> это Последнее смещение - смещение следующего сообщения, которое придет в эту тему / раздел
Lag = 24 -> 29-5 - сколько сообщений еще не обработано моим потребителем elastic_search_group
Pid - ID раздела
Q1: это правильно?
Теперь я хочу получить ту же информацию от своего потребителя Java. Здесь я обнаружил, что мне пришлось использовать два разных API:
kafka.javaapi. OffsetRequest, чтобы получить самое раннее и последнее смещение, но kafka.javaapi. OffsetFetchRequest, чтобы получить текущее смещение.
Чтобы получить самое раннее (или последнее) смещение, я делаю:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
А чтобы получить текущее смещение, мне нужно использовать совершенно другой API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: это правильно? почему есть два разных API для получения очень похожей информации?
В3: имеет ли значение, какой versionId и correlationId я здесь использую? Я думаю, что versionId должен быть 0 для kafka до 0.8.2.1 и 1 для 0.8.2.1 и более поздних версий, но похоже, что он работает с 0 и для 0.8.2.1 - см. Ниже?
Итак, для примера состояния темы выше и вышеприведенного вывода ConsumerOffsetChecker, вот что я получаю из моего кода Java:
currentOffset = 5; earliestOffset = 29; latestOffset = 29
«currentOffset» вроде в порядке, «latestOffset» тоже верен, но «earliestOffset»? Я ожидал, что это будет не менее «5»?
Q4: Как могло случиться, что ближайшееOffset больше, чем currentOffset? Мое единственное подозрение, что сообщения из этой темы, возможно, были удалены из-за политики хранения…. В каких других случаях такое могло случиться?