В Kafka как получить точное смещение по времени изготовления

Мне нужно, чтобы сообщение создавалось в Кафке час за часом в день. Каждый час я буду запускать задание по потреблению сообщения, созданного час назад. например, если текущее время 20:12, я буду использовать сообщение между 19:00:00 и 19:59:59. Это означает, что мне нужно получить начальное смещение по времени 19:00:00 и конечное смещение по времени 19:59:59. Я использовал SimpleConsumer.getOffsetsBefore, как показано в 「0.8.0 Пример SimpleConsumer < / а> 」. Проблема в том, что возвращаемое смещение не соответствует метке времени, указанной в качестве параметра. например Когда делаю отметку времени 19:00:00, я получаю сообщение, созданное в 16:38:00.


person Po Zhou    schedule 21.03.2014    source источник


Ответы (5)


В Kafka в настоящее время нет возможности получить смещение, соответствующее определенной временной метке - это сделано намеренно. Как описано в верхней части Статья журнала Джея Крепса, номер смещения обеспечивает своего рода метку времени для журнала, которая отделена от времени настенных часов. Используя смещение как ваше понятие времени, вы можете узнать, находятся ли какие-либо две системы в согласованном состоянии, просто купите, зная, до какого смещения они считали. Никогда не возникает путаницы по поводу разного времени на разных серверах, високосных лет, летнего времени, часовых поясов и т. Д. Это довольно приятно ...

СЕЙЧАС ... все это говорит, если вы знаете, что ваш сервер вышел из строя в какой-то момент X, то практически говоря, вы действительно хотели бы знать соответствующее смещение. Вы можете приблизиться. Файлы журналов на машинах kafka названы в соответствии с временем, когда они начали писать, и существует инструмент kafka (который я не могу найти прямо сейчас), который позволяет узнать, какие смещения связаны с этими файлами. Если вы хотите знать точную метку времени, вы должны кодировать метку времени в сообщениях, которые вы отправляете в Kafka.

person JnBrymn    schedule 25.08.2014
comment
Да, я решил проблему с помощью «кодирования метки времени в сообщениях, которые вы отправляете в Kafka», и это работает уже несколько месяцев. - person Po Zhou; 31.08.2014
comment
@PoZhou У меня тоже самое, не могли бы вы показать мне простой пример. Спасибо - person noodles; 22.01.2015
comment
Обратите внимание, что Kafka скоро получит поддержку этой функции: cwiki.apache.org/confluence/display/KAFKA/ - person Petri; 20.04.2016

Ниже для этого можно использовать метод api пользователя kafka getOffsetsByTimes(), он доступен с версии 0.10.0 или выше. См. JavaDoc.

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
person Liju John    schedule 20.07.2017

Как отмечается в других ответах, в более старых версиях Kafka был только приблизительный способ сопоставления времени со смещениями. Однако, начиная с версии Kafka 0.10.0 (выпущенной в мае 2016 года), Kafka поддерживает временной индекс для каждой темы. Это позволит вам эффективно переходить от времен к точным смещениям. Вы можете использовать Метод KafkaConsumer # offsetsForTimes для доступа к этой информации.

Более подробная информация о том, как привязанный ко времени индекс реализован на Страница обсуждения проекта КИП-33.

person cmccabe    schedule 14.04.2017

Покажи код:

public static Map<TopicPartition, OffsetAndTimestamp> getOffsetAndTimestampAtTime(String kafkaServer, String topic, long time) {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    kafkaParams.put(GROUP_ID_CONFIG, "consumerGroupId");
    kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(AUTO_OFFSET_RESET_CONFIG, "latest");
    kafkaParams.put(ENABLE_AUTO_COMMIT_CONFIG, false);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaParams);

    List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

    List<TopicPartition> topicPartitions = partitionInfos
            .stream()
            .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
            .collect(Collectors.toList());

    Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream()
            .collect(Collectors.toMap(tp -> tp, tp -> time));

    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(topicPartitionToTimestampMap);
    consumer.close();
    return result;
}
person diguage    schedule 08.05.2018

Kafka 1.10 поддерживает временные метки, хотя по-прежнему будет непросто использовать ее для того, что вы хотите. Но если вы знаете, с какой отметки времени вы хотите читать, и пока вы не захотите читать, то вы можете просто опросить сообщения до этого времени и прекратить потребление.

person Gerard    schedule 03.06.2016