Spark Streaming: читать из HBase по полученным ключам потока?

Как лучше всего сравнить полученные данные в Spark Streaming с существующими данными в HBase?

Мы получаем данные от kafka как DStream, и перед записью их в HBase мы должны просканировать HBase на предмет данных на основе полученных ключей от kafka, выполнить некоторые вычисления (на основе новых и старых данных для каждого ключа), а затем записывай в HBase.

Итак, если я получаю запись (key, value_new), я должен получить от HBase (key, value_old), чтобы я мог сравнить value_new и value_old.

Итак, логика была бы такой:

Dstream от Kafka - ›Запрос HBase по ключам DStream -› Некоторые вычисления - ›Запись в HBase

Мой наивный подход заключался в том, чтобы использовать Phoenix Spark Connector для чтения и присоединения к новым данным на основе ключа как способ отфильтровать ключи, не входящие в текущий микропакет. Итак, я бы получил DF с (key, value_new, value_old), и отсюда я могу сравнить внутри раздела.

JavaInputDStream<ConsumerRecord<String, String>> kafkaDStream = KafkaUtils.createDirectStream(...);

// use foreachRDD in order to use Phoenix DF API
kafkaDStream.foreachRDD((rdd, time) -> {
        // Get the singleton instance of SparkSession
        SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

        JavaPairRDD<String, String> keyValueRdd = rdd.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

        // TO SLOW FROM HERE
        Dataset<Row> oldDataDF = spark
                .read()
                .format("org.apache.phoenix.spark")
                .option("table", PHOENIX_TABLE)
                .option("zkUrl", PHOENIX_ZK)
                .load()
                .withColumnRenamed("JSON", "JSON_OLD")
                .withColumnRenamed("KEY_ROW", "KEY_OLD");

        Dataset<Row> newDF = toPhoenixTableDF(spark, keyValueRdd); //just a helper method to get RDD to DF (see note bellow)

        Dataset<Row> newAndOld = newDF.join(oldDataDF, oldDataDF.col("KEY_OLD").equalTo(newDF.col("KEY_ROW")), "left");

        /// do some calcs based on new vs old values and then write to Hbase ...

});

ПРОБЛЕМА: получение данных из HBase на основе списка ключей из полученного RDD DStream с использованием описанного выше подхода слишком медленно для потоковой передачи.

Что может быть эффективным способом сделать это?


Боковое примечание: метод toPhoenixTableDF - это просто помощник для преобразования полученного RDD в DF:

    private static Dataset<Row> toPhoenixTableDF(SparkSession spark, JavaPairRDD<String, String> keyValueRdd) {
        JavaRDD<phoenixTableRecord> tmp = keyValueRdd.map(x -> {
            phoenixTableRecord record = new phoenixTableRecord();
            record.setKEY_ROW(x._1);
            record.setJSON(x._2);
            return record;
        });

        return spark.createDataFrame(tmp, phoenixTableRecord.class);

    }

person YFl    schedule 08.02.2021    source источник
comment
На мой взгляд, это слишком общий вопрос. Постарайтесь сосредоточиться на том, что именно не работает, и объяснить, почему. Если спросить об общей архитектуре, вы получите ответы, основанные на мнениях.   -  person mike    schedule 09.02.2021
comment
В любом случае, я могу сказать, что мы используем приложение Spark Streaming (DStreams), которое получает ключ от Kafka, ищет ключ в HBase и снова записывает обработанное сообщение в Hive. Это прекрасно работает в продакшене. Сообщите мне, где у вас возникли проблемы, и я смогу помочь.   -  person mike    schedule 09.02.2021
comment
Привет, @mike, спасибо за подсказку. Отредактировано, чтобы сделать его более ясным и сосредоточенным на проблеме, а не на общей архитектуре. Моя проблема в том, чтобы сделать что-то похожее на то, что сделали вы. Не могли бы вы поделиться дополнительными соображениями и кодом, пожалуйста?   -  person YFl    schedule 09.02.2021
comment
Вы упомянули получение данных из HBase на основе списка ключей из полученного DStream RDD с использованием вышеуказанного подхода слишком медленно для потоковой передачи. Однако в нашем случае мы именно это и делаем, и, исходя из наших требований, это не слишком медленно.   -  person mike    schedule 09.02.2021
comment
Прочитать всю таблицу Phoenix и затем присоединиться, как в написанном мной коде? Мне требуется 15 секунд на 1200 записей в кластере из 5 узлов, 5 ядер и 10 ГБ памяти на исполнителя. Таблица HBase / phoenix содержит несколько миллионов строк.   -  person YFl    schedule 09.02.2021
comment
Я не знаком с Pheonix, но если вы применяете фильтры префиксов к ключам строк HBase, вам не нужно запрашивать всю таблицу, а получать только интересующие вас данные в этом пакете. Благодаря хорошему дизайну rowkey в HBase это значительно улучшит вашу производительность.   -  person mike    schedule 09.02.2021
comment
Ok. Что касается чтения из искр: как вы читаете? Какой API?   -  person YFl    schedule 09.02.2021
comment
с использованием простого клиента HBase   -  person mike    schedule 09.02.2021
comment
@mike, спасибо. Обычный клиент - это решение. Наконец, я использовал разъем Spark hbase, который предоставляет простой клиент для использования.   -  person YFl    schedule 26.02.2021


Ответы (1)


Решение состоит в том, чтобы использовать разъем Spark hbase для пакетной установки.

Здесь вы можете найти исходный код с хорошими примерами. https://github.com/apache/hbase-connectors/tree/master/spark Как и в документации HBase (spark session).

Эта библиотека использует простой Java / Scala Hbase api, поэтому вы можете контролировать операции, но управляете пулом соединений за вас через объект hbaseContext, транслируемый исполнителям, что действительно здорово. Он предоставляет простые оболочки для операций Hbase, но при необходимости мы можем просто использовать его foreach / mapPartition и получить контроль над логикой, имея при этом доступ к управляемому соединению.

person YFl    schedule 26.02.2021