Как лучше всего сравнить полученные данные в Spark Streaming с существующими данными в HBase?
Мы получаем данные от kafka как DStream, и перед записью их в HBase мы должны просканировать HBase на предмет данных на основе полученных ключей от kafka, выполнить некоторые вычисления (на основе новых и старых данных для каждого ключа), а затем записывай в HBase.
Итак, если я получаю запись (key, value_new), я должен получить от HBase (key, value_old), чтобы я мог сравнить value_new и value_old. strong >
Итак, логика была бы такой:
Dstream от Kafka - ›Запрос HBase по ключам DStream -› Некоторые вычисления - ›Запись в HBase
Мой наивный подход заключался в том, чтобы использовать Phoenix Spark Connector для чтения и присоединения к новым данным на основе ключа как способ отфильтровать ключи, не входящие в текущий микропакет. Итак, я бы получил DF с (key, value_new, value_old), и отсюда я могу сравнить внутри раздела.
JavaInputDStream<ConsumerRecord<String, String>> kafkaDStream = KafkaUtils.createDirectStream(...);
// use foreachRDD in order to use Phoenix DF API
kafkaDStream.foreachRDD((rdd, time) -> {
// Get the singleton instance of SparkSession
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaPairRDD<String, String> keyValueRdd = rdd.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
// TO SLOW FROM HERE
Dataset<Row> oldDataDF = spark
.read()
.format("org.apache.phoenix.spark")
.option("table", PHOENIX_TABLE)
.option("zkUrl", PHOENIX_ZK)
.load()
.withColumnRenamed("JSON", "JSON_OLD")
.withColumnRenamed("KEY_ROW", "KEY_OLD");
Dataset<Row> newDF = toPhoenixTableDF(spark, keyValueRdd); //just a helper method to get RDD to DF (see note bellow)
Dataset<Row> newAndOld = newDF.join(oldDataDF, oldDataDF.col("KEY_OLD").equalTo(newDF.col("KEY_ROW")), "left");
/// do some calcs based on new vs old values and then write to Hbase ...
});
ПРОБЛЕМА: получение данных из HBase на основе списка ключей из полученного RDD DStream с использованием описанного выше подхода слишком медленно для потоковой передачи.
Что может быть эффективным способом сделать это?
Боковое примечание: метод toPhoenixTableDF - это просто помощник для преобразования полученного RDD в DF:
private static Dataset<Row> toPhoenixTableDF(SparkSession spark, JavaPairRDD<String, String> keyValueRdd) {
JavaRDD<phoenixTableRecord> tmp = keyValueRdd.map(x -> {
phoenixTableRecord record = new phoenixTableRecord();
record.setKEY_ROW(x._1);
record.setJSON(x._2);
return record;
});
return spark.createDataFrame(tmp, phoenixTableRecord.class);
}