Confluent HDFS Connector теряет сообщения

Сообщество, не могли бы вы помочь мне понять, почему ~3% моих сообщений не попадают в HDFS? Я написал простого производителя в JAVA для генерации 10 миллионов сообщений.

public static final String TEST_SCHEMA = "{"
        + "\"type\":\"record\","
        + "\"name\":\"myrecord\","
        + "\"fields\":["
        + "  { \"name\":\"str1\", \"type\":\"string\" },"
        + "  { \"name\":\"str2\", \"type\":\"string\" },"
        + "  { \"name\":\"int1\", \"type\":\"int\" }"
        + "]}";

public KafkaProducerWrapper(String topic) throws UnknownHostException {
    // store topic name
    this.topic = topic;

    // initialize kafka producer
    Properties config = new Properties();
    config.put("client.id", InetAddress.getLocalHost().getHostName());
    config.put("bootstrap.servers", "myserver-1:9092");
    config.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    config.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    config.put("schema.registry.url", "http://myserver-1:8089");
    config.put("acks", "all");

    producer = new KafkaProducer(config);

    // parse schema
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(TEST_SCHEMA);
}

public void send() {
    // generate key
    int key = (int) (Math.random() * 20);

    // generate record
    GenericData.Record r = new GenericData.Record(schema);
    r.put("str1", "text" + key);
    r.put("str2", "text2" + key);
    r.put("int1", key);

    final ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "K" + key, (GenericRecord) r);
    producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null) {
                logger.error("Send failed for record {}", record, e);
                messageErrorCounter++;
                return;
            }
            logger.debug("Send succeeded for record {}", record);
            messageCounter++;
        }
    });
}

public String getStats() { return "Messages sent: " + messageCounter + "/" + messageErrorCounter; }

public long getMessageCounter() {
    return messageCounter + messageErrorCounter;
}

public void close() {
    producer.close();
}

public static void main(String[] args) throws InterruptedException, UnknownHostException {
    // initialize kafka producer
    KafkaProducerWrapper kafkaProducerWrapper = new KafkaProducerWrapper("my-test-topic");

    long max = 10000000L;
    for (long i = 0; i < max; i++) {
        kafkaProducerWrapper.send();
    }

    logger.info("producer-demo sent all messages");
    while (kafkaProducerWrapper.getMessageCounter() < max)
    {
        logger.info(kafkaProducerWrapper.getStats());
        Thread.sleep(2000);
    }

    logger.info(kafkaProducerWrapper.getStats());
    kafkaProducerWrapper.close();
}

И я использую Confluent HDFS Connector в автономном режиме для записи данных в HDFS. Конфигурация выглядит следующим образом:

name=hdfs-consumer-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1

topics=my-test-topic

hdfs.url=hdfs://my-cluster/kafka-test
hadoop.conf.dir=/etc/hadoop/conf/
flush.size=100000
rotate.interval.ms=20000

# increase timeouts to avoid CommitFailedException
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000

heartbeat.interval.ms= 60000
session.timeout.ms= 100000

Коннектор записывает данные в HDFS, но после ожидания 20000 мс (из-за rotate.interval.ms) приходят не все сообщения.

scala> spark.read.avro("/kafka-test/topics/my-test-topic/partition=*/my-test-topic*")
  .count()
res0: Long = 9749015

Есть идеи, в чем причина такого поведения? Где моя ошибка? Я использую Confluent 3.0.1/Kafka 10.0.0.1.


person Sascha Vetter    schedule 30.11.2016    source источник
comment
Вы видите, что последние несколько сообщений не перемещаются в HDFS? Если это так, вероятно, вы столкнулись с проблемой, описанной здесь github.com/confluentinc. /kafka-connect-hdfs/pull/100 Попробуйте отправить еще одно сообщение в тему после истечения срока rotate.interval.ms, чтобы убедиться, что это то, с чем вы столкнулись. Если вам нужно чередовать в зависимости от времени, вероятно, хорошей идеей будет обновление, чтобы получить исправление.   -  person dawsaw    schedule 30.11.2016
comment
Это решение! Я обновился до Confluent 3.1.1 и вижу все сообщения в HDFS. Вы хотите написать это как ответ, и я воздаю вам должное, которого вы заслуживаете?   -  person Sascha Vetter    schedule 01.12.2016
comment
Ага, не знал, что есть разница :)   -  person dawsaw    schedule 02.12.2016


Ответы (1)


Вы видите, что последние несколько сообщений не перемещаются в HDFS? Если это так, вероятно, вы столкнулись с проблемой, описанной здесь https://github.com/confluentinc/kafka-connect-hdfs/pull/100

Попробуйте отправить еще одно сообщение в тему после истечения срока rotate.interval.ms, чтобы убедиться, что это то, с чем вы столкнулись. Если вам нужно чередовать в зависимости от времени, вероятно, хорошей идеей будет обновление, чтобы получить исправление.

person dawsaw    schedule 02.12.2016